Compare commits
12 Commits
test-no-py
...
test-mypy-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a9cf4d35da | ||
|
|
6dfee8b7f6 | ||
|
|
3d15fc8426 | ||
|
|
6bf88e6752 | ||
|
|
6fb026ae6a | ||
|
|
543d0e9d0c | ||
|
|
255e51205d | ||
|
|
beed6247ef | ||
|
|
35b4feaaa9 | ||
|
|
5ac47e095d | ||
|
|
f840f92c46 | ||
|
|
91a50506c5 |
@@ -147,9 +147,9 @@ class KeyAuthorizationChallenge(_TokenChallenge):
|
||||
|
||||
:param response_cls: Subclass of `KeyAuthorizationChallengeResponse`
|
||||
that will be used to generate `response`.
|
||||
|
||||
:param str typ: type of the challenge
|
||||
"""
|
||||
|
||||
typ = NotImplemented
|
||||
response_cls = NotImplemented
|
||||
thumbprint_hash_function = (
|
||||
KeyAuthorizationChallengeResponse.thumbprint_hash_function)
|
||||
|
||||
@@ -12,7 +12,8 @@ import josepy as jose
|
||||
|
||||
from acme import errors
|
||||
# pylint: disable=unused-import, no-name-in-module
|
||||
from acme.magic_typing import Callable, Text, Union
|
||||
from acme.magic_typing import Callable, Union, Tuple, Optional
|
||||
# pylint: enable=unused-import, no-name-in-module
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -135,14 +136,23 @@ def probe_sni(name, host, port=443, timeout=300,
|
||||
|
||||
socket_kwargs = {'source_address': source_address}
|
||||
|
||||
host_protocol_agnostic = None if host == '::' or host == '0' else host
|
||||
host_protocol_agnostic = host
|
||||
if host == '::' or host == '0':
|
||||
# https://github.com/python/typeshed/pull/2136
|
||||
# while PR is not merged, we need to ignore
|
||||
host_protocol_agnostic = None
|
||||
|
||||
try:
|
||||
# pylint: disable=star-args
|
||||
logger.debug("Attempting to connect to %s:%d%s.", host_protocol_agnostic, port,
|
||||
" from {0}:{1}".format(source_address[0], source_address[1]) if \
|
||||
socket_kwargs else "")
|
||||
sock = socket.create_connection((host_protocol_agnostic, port), **socket_kwargs)
|
||||
logger.debug(
|
||||
"Attempting to connect to %s:%d%s.", host_protocol_agnostic, port,
|
||||
" from {0}:{1}".format(
|
||||
source_address[0],
|
||||
source_address[1]
|
||||
) if socket_kwargs else ""
|
||||
)
|
||||
socket_tuple = (host_protocol_agnostic, port) # type: Tuple[Optional[str], int]
|
||||
sock = socket.create_connection(socket_tuple, **socket_kwargs) # type: ignore
|
||||
except socket.error as error:
|
||||
raise errors.Error(error)
|
||||
|
||||
|
||||
@@ -8,6 +8,9 @@ class TypingClass(object):
|
||||
|
||||
try:
|
||||
# mypy doesn't respect modifying sys.modules
|
||||
from typing import * # pylint: disable=wildcard-import, unused-wildcard-import
|
||||
from typing import * # pylint: disable=wildcard-import, unused-wildcard-import
|
||||
# pylint: disable=unused-import
|
||||
from typing import Collection, IO # type: ignore
|
||||
# pylint: enable=unused-import
|
||||
except ImportError:
|
||||
sys.modules[__name__] = TypingClass()
|
||||
|
||||
@@ -8,7 +8,9 @@ import zope.component
|
||||
|
||||
from acme import challenges
|
||||
from acme import messages
|
||||
|
||||
# pylint: disable=unused-import, no-name-in-module
|
||||
from acme.magic_typing import DefaultDict, Dict, List, Set, Collection
|
||||
# pylint: enable=unused-import, no-name-in-module
|
||||
from certbot import achallenges
|
||||
from certbot import errors
|
||||
from certbot import error_handler
|
||||
@@ -117,7 +119,7 @@ class AuthHandler(object):
|
||||
|
||||
def _solve_challenges(self, aauthzrs):
|
||||
"""Get Responses for challenges from authenticators."""
|
||||
resp = []
|
||||
resp = [] # type: Collection[acme.challenges.ChallengeResponse]
|
||||
all_achalls = self._get_all_achalls(aauthzrs)
|
||||
try:
|
||||
if all_achalls:
|
||||
@@ -133,10 +135,9 @@ class AuthHandler(object):
|
||||
|
||||
def _get_all_achalls(self, aauthzrs):
|
||||
"""Return all active challenges."""
|
||||
all_achalls = []
|
||||
all_achalls = [] # type: Collection[challenges.ChallengeResponse]
|
||||
for aauthzr in aauthzrs:
|
||||
all_achalls.extend(aauthzr.achalls)
|
||||
|
||||
return all_achalls
|
||||
|
||||
def _respond(self, aauthzrs, resp, best_effort):
|
||||
@@ -146,7 +147,8 @@ class AuthHandler(object):
|
||||
|
||||
"""
|
||||
# TODO: chall_update is a dirty hack to get around acme-spec #105
|
||||
chall_update = dict()
|
||||
chall_update = dict() \
|
||||
# type: Dict[int, List[achallenges.KeyAuthorizationAnnotatedChallenge]]
|
||||
self._send_responses(aauthzrs, resp, chall_update)
|
||||
|
||||
# Check for updated status...
|
||||
@@ -198,7 +200,7 @@ class AuthHandler(object):
|
||||
while indices_to_check and rounds < max_rounds:
|
||||
# TODO: Use retry-after...
|
||||
time.sleep(min_sleep)
|
||||
all_failed_achalls = set()
|
||||
all_failed_achalls = set() # type: Set[achallenges.KeyAuthorizationAnnotatedChallenge]
|
||||
for index in indices_to_check:
|
||||
comp_achalls, failed_achalls = self._handle_check(
|
||||
aauthzrs, index, chall_update[index])
|
||||
@@ -424,7 +426,7 @@ def _find_smart_path(challbs, preferences, combinations):
|
||||
|
||||
# max_cost is now equal to sum(indices) + 1
|
||||
|
||||
best_combo = []
|
||||
best_combo = None
|
||||
# Set above completing all of the available challenges
|
||||
best_combo_cost = max_cost
|
||||
|
||||
@@ -479,7 +481,7 @@ def _report_no_chall_path(challbs):
|
||||
msg += (
|
||||
" You may need to use an authenticator "
|
||||
"plugin that can do challenges over DNS.")
|
||||
logger.fatal(msg)
|
||||
logger.critical(msg)
|
||||
raise errors.AuthorizationError(msg)
|
||||
|
||||
|
||||
@@ -522,11 +524,11 @@ def _report_failed_challs(failed_achalls):
|
||||
:class:`certbot.achallenges.AnnotatedChallenge`.
|
||||
|
||||
"""
|
||||
problems = dict()
|
||||
problems = collections.defaultdict(list)\
|
||||
# type: DefaultDict[str, List[achallenges.KeyAuthorizationAnnotatedChallenge]]
|
||||
for achall in failed_achalls:
|
||||
if achall.error:
|
||||
problems.setdefault(achall.error.typ, []).append(achall)
|
||||
|
||||
problems[achall.error.typ].append(achall)
|
||||
reporter = zope.component.getUtility(interfaces.IReporter)
|
||||
for achalls in six.itervalues(problems):
|
||||
reporter.add_message(
|
||||
|
||||
@@ -7,6 +7,7 @@ import re
|
||||
import traceback
|
||||
import zope.component
|
||||
|
||||
from acme.magic_typing import List # pylint: disable=unused-import, no-name-in-module
|
||||
from certbot import crypto_util
|
||||
from certbot import errors
|
||||
from certbot import interfaces
|
||||
@@ -226,7 +227,7 @@ def match_and_check_overlaps(cli_config, acceptable_matches, match_func, rv_func
|
||||
def find_matches(candidate_lineage, return_value, acceptable_matches):
|
||||
"""Returns a list of matches using _search_lineages."""
|
||||
acceptable_matches = [func(candidate_lineage) for func in acceptable_matches]
|
||||
acceptable_matches_rv = []
|
||||
acceptable_matches_rv = [] # type: List[str]
|
||||
for item in acceptable_matches:
|
||||
if isinstance(item, list):
|
||||
acceptable_matches_rv += item
|
||||
@@ -340,7 +341,7 @@ def _report_human_readable(config, parsed_certs):
|
||||
|
||||
def _describe_certs(config, parsed_certs, parse_failures):
|
||||
"""Print information about the certs we know about"""
|
||||
out = []
|
||||
out = [] # type: List[str]
|
||||
|
||||
notify = out.append
|
||||
|
||||
|
||||
@@ -12,10 +12,14 @@ import sys
|
||||
import configargparse
|
||||
import six
|
||||
import zope.component
|
||||
import zope.interface
|
||||
|
||||
from zope.interface import interfaces as zope_interfaces
|
||||
|
||||
from acme import challenges
|
||||
# pylint: disable=unused-import, no-name-in-module
|
||||
from acme.magic_typing import Any, Dict, Optional
|
||||
# pylint: enable=unused-import, no-name-in-module
|
||||
|
||||
import certbot
|
||||
|
||||
@@ -33,7 +37,7 @@ import certbot.plugins.selection as plugin_selection
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Global, to save us from a lot of argument passing within the scope of this module
|
||||
helpful_parser = None
|
||||
helpful_parser = None # type: Optional[HelpfulArgumentParser]
|
||||
|
||||
# For help strings, figure out how the user ran us.
|
||||
# When invoked from letsencrypt-auto, sys.argv[0] is something like:
|
||||
@@ -196,17 +200,17 @@ def set_by_cli(var):
|
||||
(CLI or config file) including if the user explicitly set it to the
|
||||
default. Returns False if the variable was assigned a default value.
|
||||
"""
|
||||
detector = set_by_cli.detector
|
||||
if detector is None:
|
||||
detector = set_by_cli.detector # type: ignore
|
||||
if detector is None and helpful_parser is not None:
|
||||
# Setup on first run: `detector` is a weird version of config in which
|
||||
# the default value of every attribute is wrangled to be boolean-false
|
||||
plugins = plugins_disco.PluginsRegistry.find_all()
|
||||
# reconstructed_args == sys.argv[1:], or whatever was passed to main()
|
||||
reconstructed_args = helpful_parser.args + [helpful_parser.verb]
|
||||
detector = set_by_cli.detector = prepare_and_parse_args(
|
||||
detector = set_by_cli.detector = prepare_and_parse_args( # type: ignore
|
||||
plugins, reconstructed_args, detect_defaults=True)
|
||||
# propagate plugin requests: eg --standalone modifies config.authenticator
|
||||
detector.authenticator, detector.installer = (
|
||||
detector.authenticator, detector.installer = ( # type: ignore
|
||||
plugin_selection.cli_plugin_requests(detector))
|
||||
|
||||
if not isinstance(getattr(detector, var), _Default):
|
||||
@@ -220,7 +224,10 @@ def set_by_cli(var):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
# static housekeeping var
|
||||
# functions attributed are not supported by mypy
|
||||
# https://github.com/python/mypy/issues/2087
|
||||
set_by_cli.detector = None # type: ignore
|
||||
|
||||
|
||||
@@ -236,8 +243,10 @@ def has_default_value(option, value):
|
||||
:rtype: bool
|
||||
|
||||
"""
|
||||
return (option in helpful_parser.defaults and
|
||||
helpful_parser.defaults[option] == value)
|
||||
if helpful_parser is not None:
|
||||
return (option in helpful_parser.defaults and
|
||||
helpful_parser.defaults[option] == value)
|
||||
return False
|
||||
|
||||
|
||||
def option_was_set(option, value):
|
||||
@@ -254,11 +263,12 @@ def option_was_set(option, value):
|
||||
|
||||
|
||||
def argparse_type(variable):
|
||||
"Return our argparse type function for a config variable (default: str)"
|
||||
"""Return our argparse type function for a config variable (default: str)"""
|
||||
# pylint: disable=protected-access
|
||||
for action in helpful_parser.parser._actions:
|
||||
if action.type is not None and action.dest == variable:
|
||||
return action.type
|
||||
if helpful_parser is not None:
|
||||
for action in helpful_parser.parser._actions:
|
||||
if action.type is not None and action.dest == variable:
|
||||
return action.type
|
||||
return str
|
||||
|
||||
def read_file(filename, mode="rb"):
|
||||
@@ -291,10 +301,12 @@ def flag_default(name):
|
||||
|
||||
def config_help(name, hidden=False):
|
||||
"""Extract the help message for an `.IConfig` attribute."""
|
||||
# pylint: disable=no-member
|
||||
if hidden:
|
||||
return argparse.SUPPRESS
|
||||
else:
|
||||
return interfaces.IConfig[name].__doc__
|
||||
field = interfaces.IConfig.__getitem__(name) # type: zope.interface.interface.Attribute
|
||||
return field.__doc__
|
||||
|
||||
|
||||
class HelpfulArgumentGroup(object):
|
||||
@@ -473,7 +485,7 @@ class HelpfulArgumentParser(object):
|
||||
HELP_TOPICS += list(self.VERBS) + self.COMMANDS_TOPICS + ["manage"]
|
||||
|
||||
plugin_names = list(plugins)
|
||||
self.help_topics = HELP_TOPICS + plugin_names + [None]
|
||||
self.help_topics = HELP_TOPICS + plugin_names + [None] # type: ignore
|
||||
|
||||
self.detect_defaults = detect_defaults
|
||||
self.args = args
|
||||
@@ -492,8 +504,11 @@ class HelpfulArgumentParser(object):
|
||||
short_usage = self._usage_string(plugins, self.help_arg)
|
||||
|
||||
self.visible_topics = self.determine_help_topics(self.help_arg)
|
||||
self.groups = {} # elements are added by .add_group()
|
||||
self.defaults = {} # elements are added by .parse_args()
|
||||
|
||||
# elements are added by .add_group()
|
||||
self.groups = {} # type: Dict[str, argparse._ArgumentGroup]
|
||||
# elements are added by .parse_args()
|
||||
self.defaults = {} # type: Dict[str, Any]
|
||||
|
||||
self.parser = configargparse.ArgParser(
|
||||
prog="certbot",
|
||||
@@ -805,7 +820,6 @@ class HelpfulArgumentParser(object):
|
||||
if self.help_arg:
|
||||
for v in verbs:
|
||||
self.groups[topic].add_argument(v, help=VERB_HELP_MAP[v]["short"])
|
||||
|
||||
return HelpfulArgumentGroup(self, topic)
|
||||
|
||||
def add_plugin_args(self, plugins):
|
||||
@@ -1296,14 +1310,14 @@ def _paths_parser(helpful):
|
||||
verb = helpful.help_arg
|
||||
|
||||
cph = "Path to where certificate is saved (with auth --csr), installed from, or revoked."
|
||||
section = ["paths", "install", "revoke", "certonly", "manage"]
|
||||
sections = ["paths", "install", "revoke", "certonly", "manage"]
|
||||
if verb == "certonly":
|
||||
add(section, "--cert-path", type=os.path.abspath,
|
||||
add(sections, "--cert-path", type=os.path.abspath,
|
||||
default=flag_default("auth_cert_path"), help=cph)
|
||||
elif verb == "revoke":
|
||||
add(section, "--cert-path", type=read_file, required=True, help=cph)
|
||||
add(sections, "--cert-path", type=read_file, required=True, help=cph)
|
||||
else:
|
||||
add(section, "--cert-path", type=os.path.abspath, help=cph)
|
||||
add(sections, "--cert-path", type=os.path.abspath, help=cph)
|
||||
|
||||
section = "paths"
|
||||
if verb in ("install", "revoke"):
|
||||
|
||||
@@ -5,7 +5,9 @@ import os
|
||||
import platform
|
||||
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives.asymmetric import rsa
|
||||
# https://github.com/python/typeshed/blob/master/third_party/
|
||||
# 2/cryptography/hazmat/primitives/asymmetric/rsa.pyi
|
||||
from cryptography.hazmat.primitives.asymmetric.rsa import generate_private_key # type: ignore
|
||||
import josepy as jose
|
||||
import OpenSSL
|
||||
import zope.component
|
||||
@@ -156,11 +158,11 @@ def register(config, account_storage, tos_cb=None):
|
||||
logger.info("Registering without email!")
|
||||
|
||||
# Each new registration shall use a fresh new key
|
||||
key = jose.JWKRSA(key=jose.ComparableRSAKey(
|
||||
rsa.generate_private_key(
|
||||
rsa_key = generate_private_key(
|
||||
public_exponent=65537,
|
||||
key_size=config.rsa_key_size,
|
||||
backend=default_backend())))
|
||||
backend=default_backend())
|
||||
key = jose.JWKRSA(key=jose.ComparableRSAKey(rsa_key))
|
||||
acme = acme_from_config_key(config, key)
|
||||
# TODO: add phone?
|
||||
regr = perform_registration(acme, config, tos_cb)
|
||||
@@ -605,8 +607,10 @@ def validate_key_csr(privkey, csr=None):
|
||||
if csr.form == "der":
|
||||
csr_obj = OpenSSL.crypto.load_certificate_request(
|
||||
OpenSSL.crypto.FILETYPE_ASN1, csr.data)
|
||||
csr = util.CSR(csr.file, OpenSSL.crypto.dump_certificate(
|
||||
OpenSSL.crypto.FILETYPE_PEM, csr_obj), "pem")
|
||||
cert_buffer = OpenSSL.crypto.dump_certificate_request(
|
||||
OpenSSL.crypto.FILETYPE_PEM, csr_obj
|
||||
)
|
||||
csr = util.CSR(csr.file, cert_buffer, "pem")
|
||||
|
||||
# If CSR is provided, it must be readable and valid.
|
||||
if csr.data and not crypto_util.valid_csr(csr.data):
|
||||
|
||||
@@ -8,15 +8,18 @@ import hashlib
|
||||
import logging
|
||||
import os
|
||||
|
||||
import OpenSSL
|
||||
|
||||
import pyrfc3339
|
||||
import six
|
||||
import zope.component
|
||||
from OpenSSL import crypto
|
||||
from OpenSSL import SSL # type: ignore
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography import x509 # type: ignore
|
||||
# https://github.com/python/typeshed/tree/master/third_party/2/cryptography
|
||||
from cryptography import x509 # type: ignore
|
||||
|
||||
from acme import crypto_util as acme_crypto_util
|
||||
|
||||
from acme.magic_typing import IO # pylint: disable=unused-import, no-name-in-module
|
||||
from certbot import errors
|
||||
from certbot import interfaces
|
||||
from certbot import util
|
||||
@@ -47,7 +50,7 @@ def init_save_key(key_size, key_dir, keyname="key-certbot.pem"):
|
||||
try:
|
||||
key_pem = make_key(key_size)
|
||||
except ValueError as err:
|
||||
logger.exception(err)
|
||||
logger.error("", exc_info=True)
|
||||
raise err
|
||||
|
||||
config = zope.component.getUtility(interfaces.IConfig)
|
||||
@@ -111,11 +114,11 @@ def valid_csr(csr):
|
||||
|
||||
"""
|
||||
try:
|
||||
req = OpenSSL.crypto.load_certificate_request(
|
||||
OpenSSL.crypto.FILETYPE_PEM, csr)
|
||||
req = crypto.load_certificate_request(
|
||||
crypto.FILETYPE_PEM, csr)
|
||||
return req.verify(req.get_pubkey())
|
||||
except OpenSSL.crypto.Error as error:
|
||||
logger.debug(error, exc_info=True)
|
||||
except crypto.Error:
|
||||
logger.debug("", exc_info=True)
|
||||
return False
|
||||
|
||||
|
||||
@@ -129,13 +132,13 @@ def csr_matches_pubkey(csr, privkey):
|
||||
:rtype: bool
|
||||
|
||||
"""
|
||||
req = OpenSSL.crypto.load_certificate_request(
|
||||
OpenSSL.crypto.FILETYPE_PEM, csr)
|
||||
pkey = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, privkey)
|
||||
req = crypto.load_certificate_request(
|
||||
crypto.FILETYPE_PEM, csr)
|
||||
pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, privkey)
|
||||
try:
|
||||
return req.verify(pkey)
|
||||
except OpenSSL.crypto.Error as error:
|
||||
logger.debug(error, exc_info=True)
|
||||
except crypto.Error:
|
||||
logger.debug("", exc_info=True)
|
||||
return False
|
||||
|
||||
|
||||
@@ -145,26 +148,26 @@ def import_csr_file(csrfile, data):
|
||||
:param str csrfile: CSR filename
|
||||
:param str data: contents of the CSR file
|
||||
|
||||
:returns: (`OpenSSL.crypto.FILETYPE_PEM`,
|
||||
:returns: (`crypto.FILETYPE_PEM`,
|
||||
util.CSR object representing the CSR,
|
||||
list of domains requested in the CSR)
|
||||
:rtype: tuple
|
||||
|
||||
"""
|
||||
PEM = OpenSSL.crypto.FILETYPE_PEM
|
||||
load = OpenSSL.crypto.load_certificate_request
|
||||
PEM = crypto.FILETYPE_PEM
|
||||
load = crypto.load_certificate_request
|
||||
try:
|
||||
# Try to parse as DER first, then fall back to PEM.
|
||||
csr = load(OpenSSL.crypto.FILETYPE_ASN1, data)
|
||||
except OpenSSL.crypto.Error:
|
||||
csr = load(crypto.FILETYPE_ASN1, data)
|
||||
except crypto.Error:
|
||||
try:
|
||||
csr = load(PEM, data)
|
||||
except OpenSSL.crypto.Error:
|
||||
except crypto.Error:
|
||||
raise errors.Error("Failed to parse CSR file: {0}".format(csrfile))
|
||||
|
||||
domains = _get_names_from_loaded_cert_or_req(csr)
|
||||
# Internally we always use PEM, so re-encode as PEM before returning.
|
||||
data_pem = OpenSSL.crypto.dump_certificate_request(PEM, csr)
|
||||
data_pem = crypto.dump_certificate_request(PEM, csr)
|
||||
return PEM, util.CSR(file=csrfile, data=data_pem, form="pem"), domains
|
||||
|
||||
|
||||
@@ -178,9 +181,9 @@ def make_key(bits):
|
||||
|
||||
"""
|
||||
assert bits >= 1024 # XXX
|
||||
key = OpenSSL.crypto.PKey()
|
||||
key.generate_key(OpenSSL.crypto.TYPE_RSA, bits)
|
||||
return OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key)
|
||||
key = crypto.PKey()
|
||||
key.generate_key(crypto.TYPE_RSA, bits)
|
||||
return crypto.dump_privatekey(crypto.FILETYPE_PEM, key)
|
||||
|
||||
|
||||
def valid_privkey(privkey):
|
||||
@@ -193,9 +196,9 @@ def valid_privkey(privkey):
|
||||
|
||||
"""
|
||||
try:
|
||||
return OpenSSL.crypto.load_privatekey(
|
||||
OpenSSL.crypto.FILETYPE_PEM, privkey).check()
|
||||
except (TypeError, OpenSSL.crypto.Error):
|
||||
return crypto.load_privatekey(
|
||||
crypto.FILETYPE_PEM, privkey).check()
|
||||
except (TypeError, crypto.Error):
|
||||
return False
|
||||
|
||||
|
||||
@@ -224,13 +227,14 @@ def verify_renewable_cert_sig(renewable_cert):
|
||||
:raises errors.Error: If signature verification fails.
|
||||
"""
|
||||
try:
|
||||
with open(renewable_cert.chain, 'rb') as chain:
|
||||
chain, _ = pyopenssl_load_certificate(chain.read())
|
||||
with open(renewable_cert.cert, 'rb') as cert:
|
||||
cert = x509.load_pem_x509_certificate(cert.read(), default_backend())
|
||||
with open(renewable_cert.chain, 'rb') as chain_file: # type: IO[bytes]
|
||||
chain, _ = pyopenssl_load_certificate(chain_file.read())
|
||||
with open(renewable_cert.cert, 'rb') as cert_file: # type: IO[bytes]
|
||||
cert = x509.load_pem_x509_certificate(
|
||||
cert_file.read(), default_backend())
|
||||
hash_name = cert.signature_hash_algorithm.name
|
||||
OpenSSL.crypto.verify(chain, cert.signature, cert.tbs_certificate_bytes, hash_name)
|
||||
except (IOError, ValueError, OpenSSL.crypto.Error) as e:
|
||||
crypto.verify(chain, cert.signature, cert.tbs_certificate_bytes, hash_name)
|
||||
except (IOError, ValueError, crypto.Error) as e:
|
||||
error_str = "verifying the signature of the cert located at {0} has failed. \
|
||||
Details: {1}".format(renewable_cert.cert, e)
|
||||
logger.exception(error_str)
|
||||
@@ -246,11 +250,11 @@ def verify_cert_matches_priv_key(cert_path, key_path):
|
||||
:raises errors.Error: If they don't match.
|
||||
"""
|
||||
try:
|
||||
context = OpenSSL.SSL.Context(OpenSSL.SSL.SSLv23_METHOD)
|
||||
context = SSL.Context(SSL.SSLv23_METHOD)
|
||||
context.use_certificate_file(cert_path)
|
||||
context.use_privatekey_file(key_path)
|
||||
context.check_privatekey()
|
||||
except (IOError, OpenSSL.SSL.Error) as e:
|
||||
except (IOError, SSL.Error) as e:
|
||||
error_str = "verifying the cert located at {0} matches the \
|
||||
private key located at {1} has failed. \
|
||||
Details: {2}".format(cert_path,
|
||||
@@ -267,12 +271,12 @@ def verify_fullchain(renewable_cert):
|
||||
:raises errors.Error: If cert and chain do not combine to fullchain.
|
||||
"""
|
||||
try:
|
||||
with open(renewable_cert.chain) as chain:
|
||||
chain = chain.read()
|
||||
with open(renewable_cert.cert) as cert:
|
||||
cert = cert.read()
|
||||
with open(renewable_cert.fullchain) as fullchain:
|
||||
fullchain = fullchain.read()
|
||||
with open(renewable_cert.chain) as chain_file: # type: IO[str]
|
||||
chain = chain_file.read()
|
||||
with open(renewable_cert.cert) as cert_file: # type: IO[str]
|
||||
cert = cert_file.read()
|
||||
with open(renewable_cert.fullchain) as fullchain_file: # type: IO[str]
|
||||
fullchain = fullchain_file.read()
|
||||
if (cert + chain) != fullchain:
|
||||
error_str = "fullchain does not match cert + chain for {0}!"
|
||||
error_str = error_str.format(renewable_cert.lineagename)
|
||||
@@ -294,43 +298,43 @@ def pyopenssl_load_certificate(data):
|
||||
|
||||
openssl_errors = []
|
||||
|
||||
for file_type in (OpenSSL.crypto.FILETYPE_PEM, OpenSSL.crypto.FILETYPE_ASN1):
|
||||
for file_type in (crypto.FILETYPE_PEM, crypto.FILETYPE_ASN1):
|
||||
try:
|
||||
return OpenSSL.crypto.load_certificate(file_type, data), file_type
|
||||
except OpenSSL.crypto.Error as error: # TODO: other errors?
|
||||
return crypto.load_certificate(file_type, data), file_type
|
||||
except crypto.Error as error: # TODO: other errors?
|
||||
openssl_errors.append(error)
|
||||
raise errors.Error("Unable to load: {0}".format(",".join(
|
||||
str(error) for error in openssl_errors)))
|
||||
|
||||
|
||||
def _load_cert_or_req(cert_or_req_str, load_func,
|
||||
typ=OpenSSL.crypto.FILETYPE_PEM):
|
||||
typ=crypto.FILETYPE_PEM):
|
||||
try:
|
||||
return load_func(typ, cert_or_req_str)
|
||||
except OpenSSL.crypto.Error as error:
|
||||
logger.exception(error)
|
||||
except crypto.Error:
|
||||
logger.error("", exc_info=True)
|
||||
raise
|
||||
|
||||
|
||||
def _get_sans_from_cert_or_req(cert_or_req_str, load_func,
|
||||
typ=OpenSSL.crypto.FILETYPE_PEM):
|
||||
typ=crypto.FILETYPE_PEM):
|
||||
# pylint: disable=protected-access
|
||||
return acme_crypto_util._pyopenssl_cert_or_req_san(_load_cert_or_req(
|
||||
cert_or_req_str, load_func, typ))
|
||||
|
||||
|
||||
def get_sans_from_cert(cert, typ=OpenSSL.crypto.FILETYPE_PEM):
|
||||
def get_sans_from_cert(cert, typ=crypto.FILETYPE_PEM):
|
||||
"""Get a list of Subject Alternative Names from a certificate.
|
||||
|
||||
:param str cert: Certificate (encoded).
|
||||
:param typ: `OpenSSL.crypto.FILETYPE_PEM` or `OpenSSL.crypto.FILETYPE_ASN1`
|
||||
:param typ: `crypto.FILETYPE_PEM` or `crypto.FILETYPE_ASN1`
|
||||
|
||||
:returns: A list of Subject Alternative Names.
|
||||
:rtype: list
|
||||
|
||||
"""
|
||||
return _get_sans_from_cert_or_req(
|
||||
cert, OpenSSL.crypto.load_certificate, typ)
|
||||
cert, crypto.load_certificate, typ)
|
||||
|
||||
|
||||
def _get_names_from_cert_or_req(cert_or_req, load_func, typ):
|
||||
@@ -343,24 +347,24 @@ def _get_names_from_loaded_cert_or_req(loaded_cert_or_req):
|
||||
return acme_crypto_util._pyopenssl_cert_or_req_all_names(loaded_cert_or_req)
|
||||
|
||||
|
||||
def get_names_from_cert(csr, typ=OpenSSL.crypto.FILETYPE_PEM):
|
||||
def get_names_from_cert(csr, typ=crypto.FILETYPE_PEM):
|
||||
"""Get a list of domains from a cert, including the CN if it is set.
|
||||
|
||||
:param str cert: Certificate (encoded).
|
||||
:param typ: `OpenSSL.crypto.FILETYPE_PEM` or `OpenSSL.crypto.FILETYPE_ASN1`
|
||||
:param typ: `crypto.FILETYPE_PEM` or `crypto.FILETYPE_ASN1`
|
||||
|
||||
:returns: A list of domain names.
|
||||
:rtype: list
|
||||
|
||||
"""
|
||||
return _get_names_from_cert_or_req(
|
||||
csr, OpenSSL.crypto.load_certificate, typ)
|
||||
csr, crypto.load_certificate, typ)
|
||||
|
||||
|
||||
def dump_pyopenssl_chain(chain, filetype=OpenSSL.crypto.FILETYPE_PEM):
|
||||
def dump_pyopenssl_chain(chain, filetype=crypto.FILETYPE_PEM):
|
||||
"""Dump certificate chain into a bundle.
|
||||
|
||||
:param list chain: List of `OpenSSL.crypto.X509` (or wrapped in
|
||||
:param list chain: List of `crypto.X509` (or wrapped in
|
||||
:class:`josepy.util.ComparableX509`).
|
||||
|
||||
"""
|
||||
@@ -378,7 +382,7 @@ def notBefore(cert_path):
|
||||
:rtype: :class:`datetime.datetime`
|
||||
|
||||
"""
|
||||
return _notAfterBefore(cert_path, OpenSSL.crypto.X509.get_notBefore)
|
||||
return _notAfterBefore(cert_path, crypto.X509.get_notBefore)
|
||||
|
||||
|
||||
def notAfter(cert_path):
|
||||
@@ -390,15 +394,15 @@ def notAfter(cert_path):
|
||||
:rtype: :class:`datetime.datetime`
|
||||
|
||||
"""
|
||||
return _notAfterBefore(cert_path, OpenSSL.crypto.X509.get_notAfter)
|
||||
return _notAfterBefore(cert_path, crypto.X509.get_notAfter)
|
||||
|
||||
|
||||
def _notAfterBefore(cert_path, method):
|
||||
"""Internal helper function for finding notbefore/notafter.
|
||||
|
||||
:param str cert_path: path to a cert in PEM format
|
||||
:param function method: one of ``OpenSSL.crypto.X509.get_notBefore``
|
||||
or ``OpenSSL.crypto.X509.get_notAfter``
|
||||
:param function method: one of ``crypto.X509.get_notBefore``
|
||||
or ``crypto.X509.get_notAfter``
|
||||
|
||||
:returns: the notBefore or notAfter value from the cert at cert_path
|
||||
:rtype: :class:`datetime.datetime`
|
||||
@@ -406,7 +410,7 @@ def _notAfterBefore(cert_path, method):
|
||||
"""
|
||||
# pylint: disable=redefined-outer-name
|
||||
with open(cert_path) as f:
|
||||
x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
|
||||
x509 = crypto.load_certificate(crypto.FILETYPE_PEM,
|
||||
f.read())
|
||||
# pyopenssl always returns bytes
|
||||
timestamp = method(x509)
|
||||
@@ -443,7 +447,7 @@ def cert_and_chain_from_fullchain(fullchain_pem):
|
||||
:rtype: tuple
|
||||
|
||||
"""
|
||||
cert = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
|
||||
OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, fullchain_pem)).decode()
|
||||
cert = crypto.dump_certificate(crypto.FILETYPE_PEM,
|
||||
crypto.load_certificate(crypto.FILETYPE_PEM, fullchain_pem)).decode()
|
||||
chain = fullchain_pem[len(cert):].lstrip()
|
||||
return (cert, chain)
|
||||
|
||||
@@ -5,6 +5,10 @@ import os
|
||||
import signal
|
||||
import traceback
|
||||
|
||||
# pylint: disable=unused-import, no-name-in-module
|
||||
from acme.magic_typing import Any, Callable, Dict, List, Union
|
||||
# pylint: enable=unused-import, no-name-in-module
|
||||
|
||||
from certbot import errors
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -56,9 +60,9 @@ class ErrorHandler(object):
|
||||
def __init__(self, func=None, *args, **kwargs):
|
||||
self.call_on_regular_exit = False
|
||||
self.body_executed = False
|
||||
self.funcs = []
|
||||
self.prev_handlers = {}
|
||||
self.received_signals = []
|
||||
self.funcs = [] # type: List[Callable[[], Any]]
|
||||
self.prev_handlers = {} # type: Dict[int, Union[int, None, Callable]]
|
||||
self.received_signals = [] # type: List[int]
|
||||
if func is not None:
|
||||
self.register(func, *args, **kwargs)
|
||||
|
||||
@@ -88,6 +92,7 @@ class ErrorHandler(object):
|
||||
return retval
|
||||
|
||||
def register(self, func, *args, **kwargs):
|
||||
# type: (Callable, *Any, **Any) -> None
|
||||
"""Sets func to be run with the given arguments during cleanup.
|
||||
|
||||
:param function func: function to be called in case of an error
|
||||
@@ -101,9 +106,8 @@ class ErrorHandler(object):
|
||||
while self.funcs:
|
||||
try:
|
||||
self.funcs[-1]()
|
||||
except Exception as error: # pylint: disable=broad-except
|
||||
logger.error("Encountered exception during recovery")
|
||||
logger.exception(error)
|
||||
except Exception: # pylint: disable=broad-except
|
||||
logger.error("Encountered exception during recovery: ", exc_info=True)
|
||||
self.funcs.pop()
|
||||
|
||||
def _set_signal_handlers(self):
|
||||
|
||||
@@ -6,6 +6,7 @@ import os
|
||||
|
||||
from subprocess import Popen, PIPE
|
||||
|
||||
from acme.magic_typing import Set, List # pylint: disable=unused-import, no-name-in-module
|
||||
from certbot import errors
|
||||
from certbot import util
|
||||
|
||||
@@ -76,7 +77,8 @@ def pre_hook(config):
|
||||
if cmd:
|
||||
_run_pre_hook_if_necessary(cmd)
|
||||
|
||||
pre_hook.already = set() # type: ignore
|
||||
|
||||
executed_pre_hooks = set() # type: Set[str]
|
||||
|
||||
|
||||
def _run_pre_hook_if_necessary(command):
|
||||
@@ -88,12 +90,12 @@ def _run_pre_hook_if_necessary(command):
|
||||
:param str command: pre-hook to be run
|
||||
|
||||
"""
|
||||
if command in pre_hook.already:
|
||||
if command in executed_pre_hooks:
|
||||
logger.info("Pre-hook command already run, skipping: %s", command)
|
||||
else:
|
||||
logger.info("Running pre-hook command: %s", command)
|
||||
_run_hook(command)
|
||||
pre_hook.already.add(command)
|
||||
executed_pre_hooks.add(command)
|
||||
|
||||
|
||||
def post_hook(config):
|
||||
@@ -127,7 +129,8 @@ def post_hook(config):
|
||||
logger.info("Running post-hook command: %s", cmd)
|
||||
_run_hook(cmd)
|
||||
|
||||
post_hook.eventually = [] # type: ignore
|
||||
|
||||
post_hooks = [] # type: List[str]
|
||||
|
||||
|
||||
def _run_eventually(command):
|
||||
@@ -139,13 +142,13 @@ def _run_eventually(command):
|
||||
:param str command: post-hook to register to be run
|
||||
|
||||
"""
|
||||
if command not in post_hook.eventually:
|
||||
post_hook.eventually.append(command)
|
||||
if command not in post_hooks:
|
||||
post_hooks.append(command)
|
||||
|
||||
|
||||
def run_saved_post_hooks():
|
||||
"""Run any post hooks that were saved up in the course of the 'renew' verb"""
|
||||
for cmd in post_hook.eventually:
|
||||
for cmd in post_hooks:
|
||||
logger.info("Running post-hook command: %s", cmd)
|
||||
_run_hook(cmd)
|
||||
|
||||
|
||||
@@ -191,9 +191,8 @@ class MemoryHandler(logging.handlers.MemoryHandler):
|
||||
only happens when flush(force=True) is called.
|
||||
|
||||
"""
|
||||
def __init__(self, target=None):
|
||||
def __init__(self, target=None, capacity=10000):
|
||||
# capacity doesn't matter because should_flush() is overridden
|
||||
capacity = float('inf')
|
||||
super(MemoryHandler, self).__init__(capacity, target=target)
|
||||
|
||||
def close(self):
|
||||
|
||||
@@ -11,6 +11,7 @@ import josepy as jose
|
||||
import zope.component
|
||||
|
||||
from acme import errors as acme_errors
|
||||
from acme.magic_typing import Union # pylint: disable=unused-import, no-name-in-module
|
||||
|
||||
import certbot
|
||||
|
||||
@@ -520,8 +521,8 @@ def _determine_account(config):
|
||||
config, account_storage, tos_cb=_tos_cb)
|
||||
except errors.MissingCommandlineFlag:
|
||||
raise
|
||||
except errors.Error as error:
|
||||
logger.debug(error, exc_info=True)
|
||||
except errors.Error:
|
||||
logger.debug("", exc_info=True)
|
||||
raise errors.Error(
|
||||
"Unable to register an account with ACME server")
|
||||
|
||||
@@ -1271,7 +1272,8 @@ def set_displayer(config):
|
||||
"""
|
||||
if config.quiet:
|
||||
config.noninteractive_mode = True
|
||||
displayer = display_util.NoninteractiveDisplay(open(os.devnull, "w"))
|
||||
displayer = display_util.NoninteractiveDisplay(open(os.devnull, "w")) \
|
||||
# type: Union[None, display_util.NoninteractiveDisplay, display_util.FileDisplay]
|
||||
elif config.noninteractive_mode:
|
||||
displayer = display_util.NoninteractiveDisplay(sys.stdout)
|
||||
else:
|
||||
|
||||
@@ -11,6 +11,8 @@ import zope.interface
|
||||
|
||||
from josepy import util as jose_util
|
||||
|
||||
from acme.magic_typing import List # pylint: disable=unused-import, no-name-in-module
|
||||
from certbot import achallenges # pylint: disable=unused-import
|
||||
from certbot import constants
|
||||
from certbot import crypto_util
|
||||
from certbot import errors
|
||||
@@ -331,8 +333,8 @@ class ChallengePerformer(object):
|
||||
|
||||
def __init__(self, configurator):
|
||||
self.configurator = configurator
|
||||
self.achalls = []
|
||||
self.indices = []
|
||||
self.achalls = [] # type: List[achallenges.KeyAuthorizationAnnotatedChallenge]
|
||||
self.indices = [] # type: List[int]
|
||||
|
||||
def add_chall(self, achall, idx=None):
|
||||
"""Store challenge to be performed when perform() is called.
|
||||
|
||||
@@ -10,6 +10,7 @@ from collections import OrderedDict
|
||||
import zope.interface
|
||||
import zope.interface.verify
|
||||
|
||||
from acme.magic_typing import Dict # pylint: disable=unused-import, no-name-in-module
|
||||
from certbot import constants
|
||||
from certbot import errors
|
||||
from certbot import interfaces
|
||||
@@ -189,7 +190,7 @@ class PluginsRegistry(collections.Mapping):
|
||||
@classmethod
|
||||
def find_all(cls):
|
||||
"""Find plugins using setuptools entry points."""
|
||||
plugins = {}
|
||||
plugins = {} # type: Dict[str, PluginEntryPoint]
|
||||
# pylint: disable=not-callable
|
||||
entry_points = itertools.chain(
|
||||
pkg_resources.iter_entry_points(
|
||||
|
||||
@@ -8,6 +8,7 @@ import pkg_resources
|
||||
import six
|
||||
import zope.interface
|
||||
|
||||
from acme.magic_typing import List # pylint: disable=unused-import, no-name-in-module
|
||||
from certbot import errors
|
||||
from certbot import interfaces
|
||||
|
||||
@@ -250,7 +251,7 @@ class PluginsRegistryTest(unittest.TestCase):
|
||||
self.plugin_ep.prepare.assert_called_once_with()
|
||||
|
||||
def test_prepare_order(self):
|
||||
order = []
|
||||
order = [] # type: List[str]
|
||||
plugins = dict(
|
||||
(c, mock.MagicMock(prepare=functools.partial(order.append, c)))
|
||||
for c in string.ascii_letters)
|
||||
|
||||
@@ -5,7 +5,9 @@ import zope.component
|
||||
import zope.interface
|
||||
|
||||
from acme import challenges
|
||||
from acme.magic_typing import Dict # pylint: disable=unused-import, no-name-in-module
|
||||
|
||||
from certbot import achallenges # pylint: disable=unused-import
|
||||
from certbot import interfaces
|
||||
from certbot import errors
|
||||
from certbot import hooks
|
||||
@@ -98,7 +100,8 @@ when it receives a TLS ClientHello with the SNI extension set to
|
||||
super(Authenticator, self).__init__(*args, **kwargs)
|
||||
self.reverter = reverter.Reverter(self.config)
|
||||
self.reverter.recovery_routine()
|
||||
self.env = dict()
|
||||
self.env = dict() \
|
||||
# type: Dict[achallenges.KeyAuthorizationAnnotatedChallenge, Dict[str, str]]
|
||||
self.tls_sni_01 = None
|
||||
|
||||
@classmethod
|
||||
|
||||
@@ -6,6 +6,7 @@ import unittest
|
||||
import mock
|
||||
import zope.component
|
||||
|
||||
from acme.magic_typing import List # pylint: disable=unused-import, no-name-in-module
|
||||
from certbot.display import util as display_util
|
||||
from certbot.tests import util as test_util
|
||||
from certbot import interfaces
|
||||
@@ -47,7 +48,7 @@ class PickPluginTest(unittest.TestCase):
|
||||
self.default = None
|
||||
self.reg = mock.MagicMock()
|
||||
self.question = "Question?"
|
||||
self.ifaces = []
|
||||
self.ifaces = [] # type: List[interfaces.IPlugin]
|
||||
|
||||
def _call(self):
|
||||
from certbot.plugins.selection import pick_plugin
|
||||
|
||||
@@ -3,6 +3,8 @@ import argparse
|
||||
import collections
|
||||
import logging
|
||||
import socket
|
||||
# https://github.com/python/typeshed/blob/master/stdlib/2and3/socket.pyi
|
||||
from socket import errno as socket_errors # type: ignore
|
||||
|
||||
import OpenSSL
|
||||
import six
|
||||
@@ -10,7 +12,10 @@ import zope.interface
|
||||
|
||||
from acme import challenges
|
||||
from acme import standalone as acme_standalone
|
||||
# pylint: disable=unused-import, no-name-in-module
|
||||
from acme.magic_typing import DefaultDict, Dict, Set, Tuple, List, Type, TYPE_CHECKING
|
||||
|
||||
from certbot import achallenges # pylint: disable=unused-import
|
||||
from certbot import errors
|
||||
from certbot import interfaces
|
||||
|
||||
@@ -18,6 +23,11 @@ from certbot.plugins import common
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
ServedType = DefaultDict[
|
||||
acme_standalone.BaseDualNetworkedServers,
|
||||
Set[achallenges.KeyAuthorizationAnnotatedChallenge]
|
||||
]
|
||||
|
||||
class ServerManager(object):
|
||||
"""Standalone servers manager.
|
||||
@@ -33,7 +43,7 @@ class ServerManager(object):
|
||||
|
||||
"""
|
||||
def __init__(self, certs, http_01_resources):
|
||||
self._instances = {}
|
||||
self._instances = {} # type: Dict[int, acme_standalone.BaseDualNetworkedServers]
|
||||
self.certs = certs
|
||||
self.http_01_resources = http_01_resources
|
||||
|
||||
@@ -59,7 +69,8 @@ class ServerManager(object):
|
||||
address = (listenaddr, port)
|
||||
try:
|
||||
if challenge_type is challenges.TLSSNI01:
|
||||
servers = acme_standalone.TLSSNI01DualNetworkedServers(address, self.certs)
|
||||
servers = acme_standalone.TLSSNI01DualNetworkedServers(
|
||||
address, self.certs) # type: acme_standalone.BaseDualNetworkedServers
|
||||
else: # challenges.HTTP01
|
||||
servers = acme_standalone.HTTP01DualNetworkedServers(
|
||||
address, self.http_01_resources)
|
||||
@@ -103,7 +114,8 @@ class ServerManager(object):
|
||||
return self._instances.copy()
|
||||
|
||||
|
||||
SUPPORTED_CHALLENGES = [challenges.TLSSNI01, challenges.HTTP01]
|
||||
SUPPORTED_CHALLENGES = [challenges.TLSSNI01, challenges.HTTP01] \
|
||||
# type: List[Type[challenges.KeyAuthorizationChallenge]]
|
||||
|
||||
|
||||
class SupportedChallengesAction(argparse.Action):
|
||||
@@ -179,14 +191,15 @@ class Authenticator(common.Plugin):
|
||||
self.key = OpenSSL.crypto.PKey()
|
||||
self.key.generate_key(OpenSSL.crypto.TYPE_RSA, 2048)
|
||||
|
||||
self.served = collections.defaultdict(set)
|
||||
self.served = collections.defaultdict(set) # type: ServedType
|
||||
|
||||
# Stuff below is shared across threads (i.e. servers read
|
||||
# values, main thread writes). Due to the nature of CPython's
|
||||
# GIL, the operations are safe, c.f.
|
||||
# https://docs.python.org/2/faq/library.html#what-kinds-of-global-value-mutation-are-thread-safe
|
||||
self.certs = {}
|
||||
self.http_01_resources = set()
|
||||
self.certs = {} # type: Dict[bytes, Tuple[OpenSSL.crypto.PKey, OpenSSL.crypto.X509]]
|
||||
self.http_01_resources = set() \
|
||||
# type: Set[acme_standalone.HTTP01RequestHandler.HTTP01Resource]
|
||||
|
||||
self.servers = ServerManager(self.certs, self.http_01_resources)
|
||||
|
||||
@@ -265,13 +278,13 @@ class Authenticator(common.Plugin):
|
||||
|
||||
|
||||
def _handle_perform_error(error):
|
||||
if error.socket_error.errno == socket.errno.EACCES:
|
||||
if error.socket_error.errno == socket_errors.EACCES:
|
||||
raise errors.PluginError(
|
||||
"Could not bind TCP port {0} because you don't have "
|
||||
"the appropriate permissions (for example, you "
|
||||
"aren't running this program as "
|
||||
"root).".format(error.port))
|
||||
elif error.socket_error.errno == socket.errno.EADDRINUSE:
|
||||
elif error.socket_error.errno == socket_errors.EADDRINUSE:
|
||||
display = zope.component.getUtility(interfaces.IDisplay)
|
||||
msg = (
|
||||
"Could not bind TCP port {0} because it is already in "
|
||||
|
||||
@@ -2,12 +2,18 @@
|
||||
import argparse
|
||||
import socket
|
||||
import unittest
|
||||
# https://github.com/python/typeshed/blob/master/stdlib/2and3/socket.pyi
|
||||
from socket import errno as socket_errors # type: ignore
|
||||
|
||||
import josepy as jose
|
||||
import mock
|
||||
import six
|
||||
|
||||
import OpenSSL.crypto # pylint: disable=unused-import
|
||||
|
||||
from acme import challenges
|
||||
from acme import standalone as acme_standalone # pylint: disable=unused-import
|
||||
from acme.magic_typing import Dict, Tuple, Set # pylint: disable=unused-import, no-name-in-module
|
||||
|
||||
from certbot import achallenges
|
||||
from certbot import errors
|
||||
@@ -21,8 +27,9 @@ class ServerManagerTest(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
from certbot.plugins.standalone import ServerManager
|
||||
self.certs = {}
|
||||
self.http_01_resources = {}
|
||||
self.certs = {} # type: Dict[bytes, Tuple[OpenSSL.crypto.PKey, OpenSSL.crypto.X509]]
|
||||
self.http_01_resources = {} \
|
||||
# type: Set[acme_standalone.HTTP01RequestHandler.HTTP01Resource]
|
||||
self.mgr = ServerManager(self.certs, self.http_01_resources)
|
||||
|
||||
def test_init(self):
|
||||
@@ -159,7 +166,7 @@ class AuthenticatorTest(unittest.TestCase):
|
||||
@test_util.patch_get_utility()
|
||||
def test_perform_eaddrinuse_retry(self, mock_get_utility):
|
||||
mock_utility = mock_get_utility()
|
||||
errno = socket.errno.EADDRINUSE
|
||||
errno = socket_errors.EADDRINUSE
|
||||
error = errors.StandaloneBindError(mock.MagicMock(errno=errno), -1)
|
||||
self.auth.servers.run.side_effect = [error] + 2 * [mock.MagicMock()]
|
||||
mock_yesno = mock_utility.yesno
|
||||
@@ -174,7 +181,7 @@ class AuthenticatorTest(unittest.TestCase):
|
||||
mock_yesno = mock_utility.yesno
|
||||
mock_yesno.return_value = False
|
||||
|
||||
errno = socket.errno.EADDRINUSE
|
||||
errno = socket_errors.EADDRINUSE
|
||||
self.assertRaises(errors.PluginError, self._fail_perform, errno)
|
||||
self._assert_correct_yesno_call(mock_yesno)
|
||||
|
||||
@@ -184,11 +191,11 @@ class AuthenticatorTest(unittest.TestCase):
|
||||
self.assertFalse(yesno_kwargs.get("default", True))
|
||||
|
||||
def test_perform_eacces(self):
|
||||
errno = socket.errno.EACCES
|
||||
errno = socket_errors.EACCES
|
||||
self.assertRaises(errors.PluginError, self._fail_perform, errno)
|
||||
|
||||
def test_perform_unexpected_socket_error(self):
|
||||
errno = socket.errno.ENOTCONN
|
||||
errno = socket_errors.ENOTCONN
|
||||
self.assertRaises(
|
||||
errors.StandaloneBindError, self._fail_perform, errno)
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ import json
|
||||
import logging
|
||||
import os
|
||||
|
||||
from acme.magic_typing import Any, Dict # pylint: disable=unused-import, no-name-in-module
|
||||
from certbot import errors
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -38,7 +39,7 @@ class PluginStorage(object):
|
||||
|
||||
:raises .errors.PluginStorageError: when unable to open or read the file
|
||||
"""
|
||||
data = dict()
|
||||
data = dict() # type: Dict[str, Any]
|
||||
filedata = ""
|
||||
try:
|
||||
with open(self._storagepath, 'r') as fh:
|
||||
|
||||
@@ -10,8 +10,12 @@ import six
|
||||
import zope.component
|
||||
import zope.interface
|
||||
|
||||
from acme import challenges
|
||||
from acme import challenges # pylint: disable=unused-import
|
||||
# pylint: disable=unused-import, no-name-in-module
|
||||
from acme.magic_typing import Dict, Set, DefaultDict, List
|
||||
# pylint: enable=unused-import, no-name-in-module
|
||||
|
||||
from certbot import achallenges # pylint: disable=unused-import
|
||||
from certbot import cli
|
||||
from certbot import errors
|
||||
from certbot import interfaces
|
||||
@@ -64,10 +68,11 @@ to serve all files under specified web root ({0})."""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(Authenticator, self).__init__(*args, **kwargs)
|
||||
self.full_roots = {}
|
||||
self.performed = collections.defaultdict(set)
|
||||
self.full_roots = {} # type: Dict[str, str]
|
||||
self.performed = collections.defaultdict(set) \
|
||||
# type: DefaultDict[str, Set[achallenges.KeyAuthorizationAnnotatedChallenge]]
|
||||
# stack of dirs successfully created by this authenticator
|
||||
self._created_dirs = []
|
||||
self._created_dirs = [] # type: List[str]
|
||||
|
||||
def prepare(self): # pylint: disable=missing-docstring
|
||||
pass
|
||||
@@ -156,7 +161,6 @@ to serve all files under specified web root ({0})."""
|
||||
" --help webroot for examples.")
|
||||
for name, path in path_map.items():
|
||||
self.full_roots[name] = os.path.join(path, challenges.HTTP01.URI_ROOT_PATH)
|
||||
|
||||
logger.debug("Creating root challenges validation dir at %s",
|
||||
self.full_roots[name])
|
||||
|
||||
@@ -207,7 +211,6 @@ to serve all files under specified web root ({0})."""
|
||||
os.umask(old_umask)
|
||||
|
||||
self.performed[root_path].add(achall)
|
||||
|
||||
return response
|
||||
|
||||
def cleanup(self, achalls): # pylint: disable=missing-docstring
|
||||
@@ -219,7 +222,7 @@ to serve all files under specified web root ({0})."""
|
||||
os.remove(validation_path)
|
||||
self.performed[root_path].remove(achall)
|
||||
|
||||
not_removed = []
|
||||
not_removed = [] # type: List[str]
|
||||
while len(self._created_dirs) > 0:
|
||||
path = self._created_dirs.pop()
|
||||
try:
|
||||
|
||||
@@ -11,6 +11,8 @@ import zope.component
|
||||
|
||||
import OpenSSL
|
||||
|
||||
from acme.magic_typing import List # pylint: disable=unused-import, no-name-in-module
|
||||
|
||||
from certbot import cli
|
||||
from certbot import crypto_util
|
||||
from certbot import errors
|
||||
@@ -59,8 +61,8 @@ def _reconstitute(config, full_path):
|
||||
"""
|
||||
try:
|
||||
renewal_candidate = storage.RenewableCert(full_path, config)
|
||||
except (errors.CertStorageError, IOError) as exc:
|
||||
logger.warning(exc)
|
||||
except (errors.CertStorageError, IOError):
|
||||
logger.warning("", exc_info=True)
|
||||
logger.warning("Renewal configuration file %s is broken. Skipping.", full_path)
|
||||
logger.debug("Traceback was:\n%s", traceback.format_exc())
|
||||
return None
|
||||
@@ -133,14 +135,15 @@ def _restore_plugin_configs(config, renewalparams):
|
||||
# longer defined, stored copies of that parameter will be
|
||||
# deserialized as strings by this logic even if they were
|
||||
# originally meant to be some other type.
|
||||
plugin_prefixes = [] # type: List[str]
|
||||
if renewalparams["authenticator"] == "webroot":
|
||||
_restore_webroot_config(config, renewalparams)
|
||||
plugin_prefixes = []
|
||||
else:
|
||||
plugin_prefixes = [renewalparams["authenticator"]]
|
||||
plugin_prefixes.append(renewalparams["authenticator"])
|
||||
|
||||
if renewalparams.get("installer", None) is not None:
|
||||
if renewalparams.get("installer") is not None:
|
||||
plugin_prefixes.append(renewalparams["installer"])
|
||||
|
||||
for plugin_prefix in set(plugin_prefixes):
|
||||
plugin_prefix = plugin_prefix.replace('-', '_')
|
||||
for config_item, config_value in six.iteritems(renewalparams):
|
||||
@@ -316,13 +319,13 @@ def report(msgs, category):
|
||||
def _renew_describe_results(config, renew_successes, renew_failures,
|
||||
renew_skipped, parse_failures):
|
||||
|
||||
out = []
|
||||
out = [] # type: List[str]
|
||||
notify = out.append
|
||||
disp = zope.component.getUtility(interfaces.IDisplay)
|
||||
|
||||
def notify_error(err):
|
||||
"""Notify and log errors."""
|
||||
notify(err)
|
||||
notify(str(err))
|
||||
logger.error(err)
|
||||
|
||||
if config.dry_run:
|
||||
|
||||
@@ -82,8 +82,10 @@ class Reverter(object):
|
||||
self._recover_checkpoint(self.config.temp_checkpoint_dir)
|
||||
except errors.ReverterError:
|
||||
# We have a partial or incomplete recovery
|
||||
logger.fatal("Incomplete or failed recovery for %s",
|
||||
self.config.temp_checkpoint_dir)
|
||||
logger.critical(
|
||||
"Incomplete or failed recovery for %s",
|
||||
self.config.temp_checkpoint_dir,
|
||||
)
|
||||
raise errors.ReverterError("Unable to revert temporary config")
|
||||
|
||||
def rollback_checkpoints(self, rollback=1):
|
||||
@@ -123,7 +125,7 @@ class Reverter(object):
|
||||
try:
|
||||
self._recover_checkpoint(cp_dir)
|
||||
except errors.ReverterError:
|
||||
logger.fatal("Failed to load checkpoint during rollback")
|
||||
logger.critical("Failed to load checkpoint during rollback")
|
||||
raise errors.ReverterError(
|
||||
"Unable to load checkpoint during rollback")
|
||||
rollback -= 1
|
||||
@@ -457,7 +459,7 @@ class Reverter(object):
|
||||
self._recover_checkpoint(self.config.in_progress_dir)
|
||||
except errors.ReverterError:
|
||||
# We have a partial or incomplete recovery
|
||||
logger.fatal("Incomplete or failed recovery for IN_PROGRESS "
|
||||
logger.critical("Incomplete or failed recovery for IN_PROGRESS "
|
||||
"checkpoint - %s",
|
||||
self.config.in_progress_dir)
|
||||
raise errors.ReverterError(
|
||||
@@ -494,7 +496,7 @@ class Reverter(object):
|
||||
"Certbot probably shut down unexpectedly",
|
||||
os.linesep, path)
|
||||
except (IOError, OSError):
|
||||
logger.fatal(
|
||||
logger.critical(
|
||||
"Unable to remove filepaths contained within %s", file_list)
|
||||
raise errors.ReverterError(
|
||||
"Unable to remove filepaths contained within "
|
||||
|
||||
@@ -10,6 +10,7 @@ import zope.component
|
||||
from acme import challenges
|
||||
from acme import client as acme_client
|
||||
from acme import messages
|
||||
from acme.magic_typing import Dict # pylint: disable=unused-import, no-name-in-module
|
||||
|
||||
from certbot import achallenges
|
||||
from certbot import errors
|
||||
@@ -354,12 +355,13 @@ class PollChallengesTest(unittest.TestCase):
|
||||
acme_util.CHALLENGES, [messages.STATUS_PENDING] * 3, False), [])
|
||||
]
|
||||
|
||||
self.chall_update = {}
|
||||
self.chall_update = {} # type: Dict[int, achallenges.KeyAuthorizationAnnotatedChallenge]
|
||||
for i, aauthzr in enumerate(self.aauthzrs):
|
||||
self.chall_update[i] = [
|
||||
challb_to_achall(challb, mock.Mock(key="dummy_key"), self.doms[i])
|
||||
for challb in aauthzr.authzr.body.challenges]
|
||||
|
||||
|
||||
@mock.patch("certbot.auth_handler.time")
|
||||
def test_poll_challenges(self, unused_mock_time):
|
||||
self.mock_net.poll.side_effect = self._mock_poll_solve_one_valid
|
||||
|
||||
@@ -495,7 +495,8 @@ class SetByCliTest(unittest.TestCase):
|
||||
for v in ('manual', 'manual_auth_hook', 'manual_public_ip_logging_ok'):
|
||||
self.assertTrue(_call_set_by_cli(v, args, verb))
|
||||
|
||||
cli.set_by_cli.detector = None
|
||||
# https://github.com/python/mypy/issues/2087
|
||||
cli.set_by_cli.detector = None # type: ignore
|
||||
|
||||
args = ['--manual-auth-hook', 'command']
|
||||
for v in ('manual_auth_hook', 'manual_public_ip_logging_ok'):
|
||||
|
||||
@@ -8,6 +8,7 @@ import unittest
|
||||
import mock
|
||||
from six.moves import reload_module # pylint: disable=import-error
|
||||
|
||||
from acme.magic_typing import List # pylint: disable=unused-import, no-name-in-module
|
||||
from certbot.tests.util import TempDirTestCase
|
||||
|
||||
class CompleterTest(TempDirTestCase):
|
||||
@@ -21,7 +22,7 @@ class CompleterTest(TempDirTestCase):
|
||||
if self.tempdir[-1] != os.sep:
|
||||
self.tempdir += os.sep
|
||||
|
||||
self.paths = []
|
||||
self.paths = [] # type: List[str]
|
||||
# create some files and directories in temp_dir
|
||||
for c in string.ascii_lowercase:
|
||||
path = os.path.join(self.tempdir, c)
|
||||
|
||||
@@ -6,6 +6,9 @@ import sys
|
||||
import unittest
|
||||
|
||||
import mock
|
||||
# pylint: disable=unused-import, no-name-in-module
|
||||
from acme.magic_typing import Callable, Dict, Union
|
||||
# pylint: enable=unused-import, no-name-in-module
|
||||
|
||||
|
||||
def get_signals(signums):
|
||||
@@ -23,8 +26,7 @@ def set_signals(sig_handler_dict):
|
||||
def signal_receiver(signums):
|
||||
"""Context manager to catch signals"""
|
||||
signals = []
|
||||
prev_handlers = {}
|
||||
prev_handlers = get_signals(signums)
|
||||
prev_handlers = get_signals(signums) # type: Dict[int, Union[int, None, Callable]]
|
||||
set_signals(dict((s, lambda s, _: signals.append(s)) for s in signums))
|
||||
yield signals
|
||||
set_signals(prev_handlers)
|
||||
|
||||
@@ -5,6 +5,7 @@ import unittest
|
||||
|
||||
import mock
|
||||
|
||||
from acme.magic_typing import List # pylint: disable=unused-import, no-name-in-module
|
||||
from certbot import errors
|
||||
from certbot.tests import util
|
||||
|
||||
@@ -106,8 +107,8 @@ class PreHookTest(HookTest):
|
||||
super(PreHookTest, self).tearDown()
|
||||
|
||||
def _reset_pre_hook_already(self):
|
||||
from certbot.hooks import pre_hook
|
||||
pre_hook.already.clear()
|
||||
from certbot.hooks import executed_pre_hooks
|
||||
executed_pre_hooks.clear()
|
||||
|
||||
def test_certonly(self):
|
||||
self.config.verb = "certonly"
|
||||
@@ -184,8 +185,8 @@ class PostHookTest(HookTest):
|
||||
super(PostHookTest, self).tearDown()
|
||||
|
||||
def _reset_post_hook_eventually(self):
|
||||
from certbot.hooks import post_hook
|
||||
post_hook.eventually = []
|
||||
from certbot.hooks import post_hooks
|
||||
del post_hooks[:]
|
||||
|
||||
def test_certonly_and_run_with_hook(self):
|
||||
for verb in ("certonly", "run",):
|
||||
@@ -238,8 +239,8 @@ class PostHookTest(HookTest):
|
||||
self.assertEqual(self._get_eventually(), expected)
|
||||
|
||||
def _get_eventually(self):
|
||||
from certbot.hooks import post_hook
|
||||
return post_hook.eventually
|
||||
from certbot.hooks import post_hooks
|
||||
return post_hooks
|
||||
|
||||
|
||||
class RunSavedPostHooksTest(HookTest):
|
||||
@@ -248,23 +249,23 @@ class RunSavedPostHooksTest(HookTest):
|
||||
@classmethod
|
||||
def _call(cls, *args, **kwargs):
|
||||
from certbot.hooks import run_saved_post_hooks
|
||||
return run_saved_post_hooks(*args, **kwargs)
|
||||
return run_saved_post_hooks()
|
||||
|
||||
def _call_with_mock_execute_and_eventually(self, *args, **kwargs):
|
||||
"""Call run_saved_post_hooks but mock out execute and eventually
|
||||
|
||||
certbot.hooks.post_hook.eventually is replaced with
|
||||
certbot.hooks.post_hooks is replaced with
|
||||
self.eventually. The mock execute object is returned rather than
|
||||
the return value of run_saved_post_hooks.
|
||||
|
||||
"""
|
||||
eventually_path = "certbot.hooks.post_hook.eventually"
|
||||
eventually_path = "certbot.hooks.post_hooks"
|
||||
with mock.patch(eventually_path, new=self.eventually):
|
||||
return self._call_with_mock_execute(*args, **kwargs)
|
||||
|
||||
def setUp(self):
|
||||
super(RunSavedPostHooksTest, self).setUp()
|
||||
self.eventually = []
|
||||
self.eventually = [] # type: List[str]
|
||||
|
||||
def test_empty(self):
|
||||
self.assertFalse(self._call_with_mock_execute_and_eventually().called)
|
||||
|
||||
@@ -10,6 +10,7 @@ import mock
|
||||
import six
|
||||
|
||||
from acme import messages
|
||||
from acme.magic_typing import Optional # pylint: disable=unused-import, no-name-in-module
|
||||
|
||||
from certbot import constants
|
||||
from certbot import errors
|
||||
@@ -21,9 +22,9 @@ class PreArgParseSetupTest(unittest.TestCase):
|
||||
"""Tests for certbot.log.pre_arg_parse_setup."""
|
||||
|
||||
@classmethod
|
||||
def _call(cls, *args, **kwargs):
|
||||
def _call(cls, *args, **kwargs): # pylint: disable=unused-argument
|
||||
from certbot.log import pre_arg_parse_setup
|
||||
return pre_arg_parse_setup(*args, **kwargs)
|
||||
return pre_arg_parse_setup()
|
||||
|
||||
@mock.patch('certbot.log.sys')
|
||||
@mock.patch('certbot.log.pre_arg_parse_except_hook')
|
||||
@@ -38,16 +39,16 @@ class PreArgParseSetupTest(unittest.TestCase):
|
||||
mock_root_logger.setLevel.assert_called_once_with(logging.DEBUG)
|
||||
self.assertEqual(mock_root_logger.addHandler.call_count, 2)
|
||||
|
||||
MemoryHandler = logging.handlers.MemoryHandler
|
||||
memory_handler = None
|
||||
memory_handler = None # type: Optional[logging.handlers.MemoryHandler]
|
||||
for call in mock_root_logger.addHandler.call_args_list:
|
||||
handler = call[0][0]
|
||||
if memory_handler is None and isinstance(handler, MemoryHandler):
|
||||
if memory_handler is None and isinstance(handler, logging.handlers.MemoryHandler):
|
||||
memory_handler = handler
|
||||
target = memory_handler.target # type: ignore
|
||||
else:
|
||||
self.assertTrue(isinstance(handler, logging.StreamHandler))
|
||||
self.assertTrue(
|
||||
isinstance(memory_handler.target, logging.StreamHandler))
|
||||
isinstance(target, logging.StreamHandler))
|
||||
|
||||
mock_register.assert_called_once_with(logging.shutdown)
|
||||
mock_sys.excepthook(1, 2, 3)
|
||||
|
||||
@@ -16,12 +16,14 @@ import josepy as jose
|
||||
import six
|
||||
from six.moves import reload_module # pylint: disable=import-error
|
||||
|
||||
from acme.magic_typing import List # pylint: disable=unused-import, no-name-in-module
|
||||
from certbot import account
|
||||
from certbot import cli
|
||||
from certbot import constants
|
||||
from certbot import configuration
|
||||
from certbot import crypto_util
|
||||
from certbot import errors
|
||||
from certbot import interfaces # pylint: disable=unused-import
|
||||
from certbot import main
|
||||
from certbot import updater
|
||||
from certbot import util
|
||||
@@ -600,14 +602,14 @@ class MainTest(test_util.ConfigTestCase): # pylint: disable=too-many-public-met
|
||||
|
||||
if mockisfile:
|
||||
orig_open = os.path.isfile
|
||||
def mock_isfile(fn, *args, **kwargs):
|
||||
def mock_isfile(fn, *args, **kwargs): # pylint: disable=unused-argument
|
||||
"""Mock os.path.isfile()"""
|
||||
if (fn.endswith("cert") or
|
||||
fn.endswith("chain") or
|
||||
fn.endswith("privkey")):
|
||||
return True
|
||||
else:
|
||||
return orig_open(fn, *args, **kwargs)
|
||||
return orig_open(fn)
|
||||
|
||||
with mock.patch("os.path.isfile") as mock_if:
|
||||
mock_if.side_effect = mock_isfile
|
||||
@@ -836,7 +838,7 @@ class MainTest(test_util.ConfigTestCase): # pylint: disable=too-many-public-met
|
||||
@mock.patch('certbot.main.plugins_disco')
|
||||
@mock.patch('certbot.main.cli.HelpfulArgumentParser.determine_help_topics')
|
||||
def test_plugins_no_args(self, _det, mock_disco):
|
||||
ifaces = []
|
||||
ifaces = [] # type: List[interfaces.IPlugin]
|
||||
plugins = mock_disco.PluginsRegistry.find_all()
|
||||
|
||||
stdout = six.StringIO()
|
||||
@@ -851,7 +853,7 @@ class MainTest(test_util.ConfigTestCase): # pylint: disable=too-many-public-met
|
||||
@mock.patch('certbot.main.plugins_disco')
|
||||
@mock.patch('certbot.main.cli.HelpfulArgumentParser.determine_help_topics')
|
||||
def test_plugins_no_args_unprivileged(self, _det, mock_disco):
|
||||
ifaces = []
|
||||
ifaces = [] # type: List[interfaces.IPlugin]
|
||||
plugins = mock_disco.PluginsRegistry.find_all()
|
||||
|
||||
def throw_error(directory, mode, uid, strict):
|
||||
@@ -873,7 +875,7 @@ class MainTest(test_util.ConfigTestCase): # pylint: disable=too-many-public-met
|
||||
@mock.patch('certbot.main.plugins_disco')
|
||||
@mock.patch('certbot.main.cli.HelpfulArgumentParser.determine_help_topics')
|
||||
def test_plugins_init(self, _det, mock_disco):
|
||||
ifaces = []
|
||||
ifaces = [] # type: List[interfaces.IPlugin]
|
||||
plugins = mock_disco.PluginsRegistry.find_all()
|
||||
|
||||
stdout = six.StringIO()
|
||||
@@ -891,7 +893,7 @@ class MainTest(test_util.ConfigTestCase): # pylint: disable=too-many-public-met
|
||||
@mock.patch('certbot.main.plugins_disco')
|
||||
@mock.patch('certbot.main.cli.HelpfulArgumentParser.determine_help_topics')
|
||||
def test_plugins_prepare(self, _det, mock_disco):
|
||||
ifaces = []
|
||||
ifaces = [] # type: List[interfaces.IPlugin]
|
||||
plugins = mock_disco.PluginsRegistry.find_all()
|
||||
|
||||
stdout = six.StringIO()
|
||||
@@ -1040,9 +1042,8 @@ class MainTest(test_util.ConfigTestCase): # pylint: disable=too-many-public-met
|
||||
mock_client.obtain_certificate.return_value = (mock_certr, 'chain',
|
||||
mock_key, 'csr')
|
||||
|
||||
def write_msg(message, *args, **kwargs):
|
||||
def write_msg(message, *args, **kwargs): # pylint: disable=unused-argument
|
||||
"""Write message to stdout."""
|
||||
_, _ = args, kwargs
|
||||
stdout.write(message)
|
||||
|
||||
try:
|
||||
|
||||
@@ -12,7 +12,7 @@ class ReporterTest(unittest.TestCase):
|
||||
from certbot import reporter
|
||||
self.reporter = reporter.Reporter(mock.MagicMock(quiet=False))
|
||||
|
||||
self.old_stdout = sys.stdout
|
||||
self.old_stdout = sys.stdout # type: ignore
|
||||
sys.stdout = six.StringIO()
|
||||
|
||||
def tearDown(self):
|
||||
@@ -21,32 +21,32 @@ class ReporterTest(unittest.TestCase):
|
||||
def test_multiline_message(self):
|
||||
self.reporter.add_message("Line 1\nLine 2", self.reporter.LOW_PRIORITY)
|
||||
self.reporter.print_messages()
|
||||
output = sys.stdout.getvalue()
|
||||
output = sys.stdout.getvalue() # type: ignore
|
||||
self.assertTrue("Line 1\n" in output)
|
||||
self.assertTrue("Line 2" in output)
|
||||
|
||||
def test_tty_print_empty(self):
|
||||
sys.stdout.isatty = lambda: True
|
||||
sys.stdout.isatty = lambda: True # type: ignore
|
||||
self.test_no_tty_print_empty()
|
||||
|
||||
def test_no_tty_print_empty(self):
|
||||
self.reporter.print_messages()
|
||||
self.assertEqual(sys.stdout.getvalue(), "")
|
||||
self.assertEqual(sys.stdout.getvalue(), "") # type: ignore
|
||||
try:
|
||||
raise ValueError
|
||||
except ValueError:
|
||||
self.reporter.print_messages()
|
||||
self.assertEqual(sys.stdout.getvalue(), "")
|
||||
self.assertEqual(sys.stdout.getvalue(), "") # type: ignore
|
||||
|
||||
def test_tty_successful_exit(self):
|
||||
sys.stdout.isatty = lambda: True
|
||||
sys.stdout.isatty = lambda: True # type: ignore
|
||||
self._successful_exit_common()
|
||||
|
||||
def test_no_tty_successful_exit(self):
|
||||
self._successful_exit_common()
|
||||
|
||||
def test_tty_unsuccessful_exit(self):
|
||||
sys.stdout.isatty = lambda: True
|
||||
sys.stdout.isatty = lambda: True # type: ignore
|
||||
self._unsuccessful_exit_common()
|
||||
|
||||
def test_no_tty_unsuccessful_exit(self):
|
||||
@@ -55,7 +55,7 @@ class ReporterTest(unittest.TestCase):
|
||||
def _successful_exit_common(self):
|
||||
self._add_messages()
|
||||
self.reporter.print_messages()
|
||||
output = sys.stdout.getvalue()
|
||||
output = sys.stdout.getvalue() # type: ignore
|
||||
self.assertTrue("IMPORTANT NOTES:" in output)
|
||||
self.assertTrue("High" in output)
|
||||
self.assertTrue("Med" in output)
|
||||
@@ -67,7 +67,7 @@ class ReporterTest(unittest.TestCase):
|
||||
raise ValueError
|
||||
except ValueError:
|
||||
self.reporter.print_messages()
|
||||
output = sys.stdout.getvalue()
|
||||
output = sys.stdout.getvalue() # type: ignore
|
||||
self.assertTrue("IMPORTANT NOTES:" in output)
|
||||
self.assertTrue("High" in output)
|
||||
self.assertTrue("Med" not in output)
|
||||
|
||||
@@ -20,6 +20,7 @@ from collections import OrderedDict
|
||||
|
||||
import configargparse
|
||||
|
||||
from acme.magic_typing import Tuple, Union # pylint: disable=unused-import, no-name-in-module
|
||||
from certbot import constants
|
||||
from certbot import errors
|
||||
from certbot import lock
|
||||
@@ -218,8 +219,12 @@ def safe_open(path, mode="w", chmod=None, buffering=None):
|
||||
|
||||
"""
|
||||
# pylint: disable=star-args
|
||||
open_args = () if chmod is None else (chmod,)
|
||||
fdopen_args = () if buffering is None else (buffering,)
|
||||
open_args = () # type: Union[Tuple[()], Tuple[int]]
|
||||
if chmod is not None:
|
||||
open_args = (chmod,)
|
||||
fdopen_args = () # type: Union[Tuple[()], Tuple[int]]
|
||||
if buffering is not None:
|
||||
fdopen_args = (buffering,)
|
||||
return os.fdopen(
|
||||
os.open(path, os.O_CREAT | os.O_EXCL | os.O_RDWR, *open_args),
|
||||
mode, *fdopen_args)
|
||||
@@ -303,9 +308,8 @@ def get_filtered_names(all_names):
|
||||
for name in all_names:
|
||||
try:
|
||||
filtered_names.add(enforce_le_validity(name))
|
||||
except errors.ConfigurationError as error:
|
||||
logger.debug('Not suggesting name "%s"', name)
|
||||
logger.debug(error)
|
||||
except errors.ConfigurationError:
|
||||
logger.debug('Not suggesting name "%s"', name, exc_info=True)
|
||||
return filtered_names
|
||||
|
||||
|
||||
|
||||
6
mypy.ini
6
mypy.ini
@@ -5,6 +5,12 @@ ignore_missing_imports = True
|
||||
[mypy-acme.*]
|
||||
check_untyped_defs = True
|
||||
|
||||
[mypy-acme.magic_typing_test]
|
||||
ignore_errors = True
|
||||
|
||||
[mypy-certbot.*]
|
||||
check_untyped_defs = True
|
||||
|
||||
[mypy-certbot_apache.*]
|
||||
check_untyped_defs = True
|
||||
|
||||
|
||||
2
setup.py
2
setup.py
@@ -34,7 +34,7 @@ version = meta['version']
|
||||
# specified here to avoid masking the more specific request requirements in
|
||||
# acme. See https://github.com/pypa/pip/issues/988 for more info.
|
||||
install_requires = [
|
||||
'acme>=0.22.1',
|
||||
'acme>0.24.0',
|
||||
# We technically need ConfigArgParse 0.10.0 for Python 2.6 support, but
|
||||
# saying so here causes a runtime error against our temporary fork of 0.9.3
|
||||
# in which we added 2.6 support (see #2243), so we relax the requirement.
|
||||
|
||||
@@ -30,7 +30,7 @@ josepy==1.0.1
|
||||
logger==1.4
|
||||
logilab-common==1.4.1
|
||||
MarkupSafe==1.0
|
||||
mypy==0.580
|
||||
mypy==0.600
|
||||
ndg-httpsclient==0.3.2
|
||||
oauth2client==2.0.0
|
||||
pathlib2==2.3.0
|
||||
|
||||
@@ -12,12 +12,18 @@ else
|
||||
pip_install="$(dirname $0)/pip_install_editable.sh"
|
||||
fi
|
||||
|
||||
temp_cwd=$(mktemp -d)
|
||||
trap "rm -rf $temp_cwd" EXIT
|
||||
|
||||
set -x
|
||||
for requirement in "$@" ; do
|
||||
$pip_install $requirement
|
||||
pkg=$(echo $requirement | cut -f1 -d\[) # remove any extras such as [dev]
|
||||
pkg=$(echo "$pkg" | tr - _ ) # convert package names to Python import names
|
||||
if [ $pkg = "." ]; then
|
||||
pkg="certbot"
|
||||
fi
|
||||
cd "$temp_cwd"
|
||||
pytest --numprocesses auto --quiet --pyargs $pkg
|
||||
cd -
|
||||
done
|
||||
|
||||
Reference in New Issue
Block a user