Compare commits

...

17 Commits

Author SHA1 Message Date
Erica Portnoy
8468039b5d support earlier mock 2018-02-21 20:57:13 -08:00
Erica Portnoy
e2a091d7d6 remove the correct dead code 2018-02-21 20:47:56 -08:00
Erica Portnoy
1400dbb44f remove dead code 2018-02-21 20:45:33 -08:00
Erica Portnoy
461f2d4c6a dump the wrapped cert 2018-02-21 20:44:37 -08:00
Erica Portnoy
54b5991875 remove ClientV1 passthrough 2018-02-21 20:22:27 -08:00
Erica Portnoy
dc0684b3e6 s/rytpe/rtype 2018-02-21 20:20:19 -08:00
Erica Portnoy
7bcb54d107 add tests for acme.client and change to fetch chain failure to TimeoutError 2018-02-21 20:16:24 -08:00
Erica Portnoy
66d6b15b95 Add test for certbot.crypto_util.cert_and_chain_from_fullchain 2018-02-21 18:03:25 -08:00
Erica Portnoy
c34cbb4a62 add test for acme.dump_pyopenssl_chain 2018-02-21 17:56:58 -08:00
Erica Portnoy
bdaeb2aedf update renewal call 2018-02-21 17:35:21 -08:00
Erica Portnoy
f2dbcd0edf remove correct import 2018-02-21 16:09:35 -08:00
Erica Portnoy
095b1e059d extraneous client_test imports 2018-02-21 16:04:13 -08:00
Erica Portnoy
074a2e9218 update certbot.tests.client_test.py 2018-02-21 16:02:47 -08:00
Erica Portnoy
c066a4d796 util methods and imports for finalize_order shim refactor 2018-02-21 14:04:44 -08:00
Erica Portnoy
ff1ba9b8c6 Merge branch 'master' into finalize_shim 2018-02-20 18:54:57 -08:00
Erica Portnoy
0a67e1a4e8 major structure of finalize_order shim refactor 2018-02-20 18:51:55 -08:00
Erica Portnoy
1f7b35e320 update order object with returned authorizations 2018-02-20 18:09:56 -08:00
11 changed files with 240 additions and 143 deletions

View File

@@ -677,9 +677,6 @@ class BackwardsCompatibleClientV2(object):
return getattr(self.client, name) return getattr(self.client, name)
elif name in dir(ClientBase): elif name in dir(ClientBase):
return getattr(self.client, name) return getattr(self.client, name)
# temporary, for breaking changes into smaller pieces
elif name in dir(Client):
return getattr(self.client, name)
else: else:
raise AttributeError() raise AttributeError()
@@ -723,10 +720,48 @@ class BackwardsCompatibleClientV2(object):
authorizations = [] authorizations = []
for domain in dnsNames: for domain in dnsNames:
authorizations.append(self.client.request_domain_challenges(domain)) authorizations.append(self.client.request_domain_challenges(domain))
return messages.OrderResource(authorizations=authorizations) return messages.OrderResource(authorizations=authorizations, csr_pem=csr_pem)
else: else:
return self.client.new_order(csr_pem) return self.client.new_order(csr_pem)
def finalize_order(self, orderr, deadline):
"""Finalize an order and obtain a certificate.
:param messages.OrderResource orderr: order to finalize
:param datetime.datetime deadline: when to stop polling and timeout
:returns: finalized order
:rtype: messages.OrderResource
"""
if self.acme_version == 1:
csr_pem = orderr.csr_pem
certr = self.client.request_issuance(
jose.ComparableX509(
OpenSSL.crypto.load_certificate_request(OpenSSL.crypto.FILETYPE_PEM, csr_pem)),
orderr.authorizations)
chain = None
while datetime.datetime.now() < deadline:
try:
chain = self.client.fetch_chain(certr)
break
except errors.Error:
time.sleep(1)
if chain is None:
raise errors.TimeoutError(
'Failed to fetch chain. You should not deploy the generated '
'certificate, please rerun the command for a new one.')
cert = OpenSSL.crypto.dump_certificate(
OpenSSL.crypto.FILETYPE_PEM, certr.body.wrapped)
chain = crypto_util.dump_pyopenssl_chain(chain)
return orderr.update(fullchain_pem=(cert + chain))
else:
return self.client.finalize_order(orderr, deadline)
def _acme_version_from_directory(self, directory): def _acme_version_from_directory(self, directory):
if hasattr(directory, 'newNonce'): if hasattr(directory, 'newNonce'):
return 2 return 2

View File

@@ -8,6 +8,7 @@ from six.moves import http_client # pylint: disable=import-error
import josepy as jose import josepy as jose
import mock import mock
import OpenSSL
import requests import requests
from acme import challenges from acme import challenges
@@ -82,6 +83,29 @@ class ClientTestBase(unittest.TestCase):
class BackwardsCompatibleClientV2Test(ClientTestBase): class BackwardsCompatibleClientV2Test(ClientTestBase):
"""Tests for acme.client.BackwardsCompatibleClientV2.""" """Tests for acme.client.BackwardsCompatibleClientV2."""
def setUp(self):
super(BackwardsCompatibleClientV2Test, self).setUp()
# contains a loaded cert
self.certr = messages.CertificateResource(
body=messages_test.CERT)
loaded = OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_PEM, CERT_SAN_PEM)
wrapped = jose.ComparableX509(loaded)
self.chain = [wrapped, wrapped]
self.cert_pem = OpenSSL.crypto.dump_certificate(
OpenSSL.crypto.FILETYPE_PEM, messages_test.CERT.wrapped)
single_chain = OpenSSL.crypto.dump_certificate(
OpenSSL.crypto.FILETYPE_PEM, loaded)
self.chain_pem = single_chain + single_chain
self.fullchain_pem = self.cert_pem + self.chain_pem
self.orderr = messages.OrderResource(
csr_pem=CSR_SAN_PEM)
def _init(self): def _init(self):
uri = 'http://www.letsencrypt-demo.org/directory' uri = 'http://www.letsencrypt-demo.org/directory'
from acme.client import BackwardsCompatibleClientV2 from acme.client import BackwardsCompatibleClientV2
@@ -109,8 +133,6 @@ class BackwardsCompatibleClientV2Test(ClientTestBase):
client = self._init() client = self._init()
self.assertEqual(client.directory, client.client.directory) self.assertEqual(client.directory, client.client.directory)
self.assertEqual(client.key, KEY) self.assertEqual(client.key, KEY)
# delete this line once we finish migrating to new API:
self.assertEqual(client.register, client.client.register)
self.assertEqual(client.update_registration, client.client.update_registration) self.assertEqual(client.update_registration, client.client.update_registration)
self.assertRaises(AttributeError, client.__getattr__, 'nonexistent') self.assertRaises(AttributeError, client.__getattr__, 'nonexistent')
self.assertRaises(AttributeError, client.__getattr__, 'new_account_and_tos') self.assertRaises(AttributeError, client.__getattr__, 'new_account_and_tos')
@@ -182,7 +204,52 @@ class BackwardsCompatibleClientV2Test(ClientTestBase):
client.new_order(mock_csr_pem) client.new_order(mock_csr_pem)
mock_client().new_order.assert_called_once_with(mock_csr_pem) mock_client().new_order.assert_called_once_with(mock_csr_pem)
@mock.patch('acme.client.Client')
def test_finalize_order_v1_success(self, mock_client):
self.response.json.return_value = DIRECTORY_V1.to_json()
mock_client().request_issuance.return_value = self.certr
mock_client().fetch_chain.return_value = self.chain
deadline = datetime.datetime(9999, 9, 9)
client = self._init()
result = client.finalize_order(self.orderr, deadline)
self.assertEqual(result.fullchain_pem, self.fullchain_pem)
mock_client().fetch_chain.assert_called_once_with(self.certr)
@mock.patch('acme.client.Client')
def test_finalize_order_v1_fetch_chain_error(self, mock_client):
self.response.json.return_value = DIRECTORY_V1.to_json()
mock_client().request_issuance.return_value = self.certr
mock_client().fetch_chain.return_value = self.chain
mock_client().fetch_chain.side_effect = [errors.Error, self.chain]
deadline = datetime.datetime(9999, 9, 9)
client = self._init()
result = client.finalize_order(self.orderr, deadline)
self.assertEqual(result.fullchain_pem, self.fullchain_pem)
self.assertEqual(mock_client().fetch_chain.call_count, 2)
@mock.patch('acme.client.Client')
def test_finalize_order_v1_timeout(self, mock_client):
self.response.json.return_value = DIRECTORY_V1.to_json()
mock_client().request_issuance.return_value = self.certr
deadline = deadline = datetime.datetime.now() - datetime.timedelta(seconds=60)
client = self._init()
self.assertRaises(errors.TimeoutError, client.finalize_order,
self.orderr, deadline)
def test_finalize_order_v2(self):
self.response.json.return_value = DIRECTORY_V2.to_json()
mock_orderr = mock.MagicMock()
mock_deadline = mock.MagicMock()
with mock.patch('acme.client.ClientV2') as mock_client:
client = self._init()
client.finalize_order(mock_orderr, mock_deadline)
mock_client().finalize_order.assert_called_once_with(mock_orderr, mock_deadline)
class ClientTest(ClientTestBase): class ClientTest(ClientTestBase):

View File

@@ -8,6 +8,8 @@ import socket
import sys import sys
import OpenSSL import OpenSSL
import josepy as jose
from acme import errors from acme import errors
@@ -280,3 +282,23 @@ def gen_ss_cert(key, domains, not_before=None,
cert.set_pubkey(key) cert.set_pubkey(key)
cert.sign(key, "sha256") cert.sign(key, "sha256")
return cert return cert
def dump_pyopenssl_chain(chain, filetype=OpenSSL.crypto.FILETYPE_PEM):
"""Dump certificate chain into a bundle.
:param list chain: List of `OpenSSL.crypto.X509` (or wrapped in
:class:`josepy.util.ComparableX509`).
"""
# XXX: returns empty string when no chain is available, which
# shuts up RenewableCert, but might not be the best solution...
def _dump_cert(cert):
if isinstance(cert, jose.ComparableX509):
# pylint: disable=protected-access
cert = cert.wrapped
return OpenSSL.crypto.dump_certificate(filetype, cert)
# assumes that OpenSSL.crypto.dump_certificate includes ending
# newline character
return b"".join(_dump_cert(cert) for cert in chain)

View File

@@ -225,5 +225,33 @@ class MakeCSRTest(unittest.TestCase):
self.assertEqual(len(must_staple_exts), 1, self.assertEqual(len(must_staple_exts), 1,
"Expected exactly one Must Staple extension") "Expected exactly one Must Staple extension")
class DumpPyopensslChainTest(unittest.TestCase):
"""Test for dump_pyopenssl_chain."""
@classmethod
def _call(cls, loaded):
# pylint: disable=protected-access
from acme.crypto_util import dump_pyopenssl_chain
return dump_pyopenssl_chain(loaded)
def test_dump_pyopenssl_chain(self):
names = ['cert.pem', 'cert-san.pem', 'cert-idnsans.pem']
loaded = [test_util.load_cert(name) for name in names]
length = sum(
len(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert))
for cert in loaded)
self.assertEqual(len(self._call(loaded)), length)
def test_dump_pyopenssl_chain_wrapped(self):
names = ['cert.pem', 'cert-san.pem', 'cert-idnsans.pem']
loaded = [test_util.load_cert(name) for name in names]
wrap_func = jose.ComparableX509
wrapped = [wrap_func(cert) for cert in loaded]
dump_func = OpenSSL.crypto.dump_certificate
length = sum(len(dump_func(OpenSSL.crypto.FILETYPE_PEM, cert)) for cert in loaded)
self.assertEqual(len(self._call(wrapped)), length)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() # pragma: no cover unittest.main() # pragma: no cover

View File

@@ -1,4 +1,5 @@
"""Certbot client API.""" """Certbot client API."""
import datetime
import logging import logging
import os import os
import platform import platform
@@ -11,7 +12,6 @@ import zope.component
from acme import client as acme_client from acme import client as acme_client
from acme import crypto_util as acme_crypto_util from acme import crypto_util as acme_crypto_util
from acme import errors as acme_errors
from acme import messages from acme import messages
import certbot import certbot
@@ -243,8 +243,7 @@ class Client(object):
than `authkey`. than `authkey`.
:param acme.messages.OrderResource orderr: contains authzrs :param acme.messages.OrderResource orderr: contains authzrs
:returns: `.CertificateResource` and certificate chain (as :returns: certificate and chain as PEM strings
returned by `.fetch_chain`).
:rtype: tuple :rtype: tuple
""" """
@@ -264,32 +263,9 @@ class Client(object):
orderr = orderr.update(authorizations=authzr) orderr = orderr.update(authorizations=authzr)
authzr = orderr.authorizations authzr = orderr.authorizations
certr = self.acme.request_issuance( deadline = datetime.datetime.now() + datetime.timedelta(seconds=90)
jose.ComparableX509( orderr = self.acme.finalize_order(orderr, deadline)
OpenSSL.crypto.load_certificate_request(OpenSSL.crypto.FILETYPE_PEM, csr.data)), return crypto_util.cert_and_chain_from_fullchain(orderr.fullchain_pem)
authzr)
notify = zope.component.getUtility(interfaces.IDisplay).notification
retries = 0
chain = None
while retries <= 1:
if retries:
notify('Failed to fetch chain, please check your network '
'and continue', pause=True)
try:
chain = self.acme.fetch_chain(certr)
break
except acme_errors.Error:
logger.debug('Failed to fetch chain', exc_info=True)
retries += 1
if chain is None:
raise acme_errors.Error(
'Failed to fetch chain. You should not deploy the generated '
'certificate, please rerun the command for a new one.')
return certr, chain
def obtain_certificate(self, domains): def obtain_certificate(self, domains):
"""Obtains a certificate from the ACME server. """Obtains a certificate from the ACME server.
@@ -298,10 +274,9 @@ class Client(object):
:param list domains: domains to get a certificate :param list domains: domains to get a certificate
:returns: `.CertificateResource`, certificate chain (as :returns: :returns: certificate as PEM string, chain as PEM string,
returned by `.fetch_chain`), and newly generated private key newly generated private key (`.util.Key`), and DER-encoded
(`.util.Key`) and DER-encoded Certificate Signing Request Certificate Signing Request (`.util.CSR`).
(`.util.CSR`).
:rtype: tuple :rtype: tuple
""" """
@@ -329,9 +304,9 @@ class Client(object):
os.remove(csr.file) os.remove(csr.file)
return self.obtain_certificate(successful_domains) return self.obtain_certificate(successful_domains)
else: else:
certr, chain = self.obtain_certificate_from_csr(csr, orderr) cert, chain = self.obtain_certificate_from_csr(csr, orderr)
return certr, chain, key, csr return cert, chain, key, csr
# pylint: disable=no-member # pylint: disable=no-member
def obtain_and_enroll_certificate(self, domains, certname): def obtain_and_enroll_certificate(self, domains, certname):
@@ -350,7 +325,7 @@ class Client(object):
be obtained, or None if doing a successful dry run. be obtained, or None if doing a successful dry run.
""" """
certr, chain, key, _ = self.obtain_certificate(domains) cert, chain, key, _ = self.obtain_certificate(domains)
if (self.config.config_dir != constants.CLI_DEFAULTS["config_dir"] or if (self.config.config_dir != constants.CLI_DEFAULTS["config_dir"] or
self.config.work_dir != constants.CLI_DEFAULTS["work_dir"]): self.config.work_dir != constants.CLI_DEFAULTS["work_dir"]):
@@ -365,19 +340,16 @@ class Client(object):
return None return None
else: else:
return storage.RenewableCert.new_lineage( return storage.RenewableCert.new_lineage(
new_name, OpenSSL.crypto.dump_certificate( new_name, cert,
OpenSSL.crypto.FILETYPE_PEM, certr.body.wrapped), key.pem, chain,
key.pem, crypto_util.dump_pyopenssl_chain(chain),
self.config) self.config)
def save_certificate(self, certr, chain_cert, def save_certificate(self, cert_pem, chain_pem,
cert_path, chain_path, fullchain_path): cert_path, chain_path, fullchain_path):
"""Saves the certificate received from the ACME server. """Saves the certificate received from the ACME server.
:param certr: ACME "certificate" resource. :param str cert_pem:
:type certr: :class:`acme.messages.Certificate` :param str chain_pem:
:param list chain_cert:
:param str cert_path: Candidate path to a certificate. :param str cert_path: Candidate path to a certificate.
:param str chain_path: Candidate path to a certificate chain. :param str chain_path: Candidate path to a certificate chain.
:param str fullchain_path: Candidate path to a full cert chain. :param str fullchain_path: Candidate path to a full cert chain.
@@ -394,8 +366,6 @@ class Client(object):
os.path.dirname(path), 0o755, os.geteuid(), os.path.dirname(path), 0o755, os.geteuid(),
self.config.strict_permissions) self.config.strict_permissions)
cert_pem = OpenSSL.crypto.dump_certificate(
OpenSSL.crypto.FILETYPE_PEM, certr.body.wrapped)
cert_file, abs_cert_path = _open_pem_file('cert_path', cert_path) cert_file, abs_cert_path = _open_pem_file('cert_path', cert_path)
@@ -406,20 +376,15 @@ class Client(object):
logger.info("Server issued certificate; certificate written to %s", logger.info("Server issued certificate; certificate written to %s",
abs_cert_path) abs_cert_path)
if not chain_cert: chain_file, abs_chain_path =\
return abs_cert_path, None, None _open_pem_file('chain_path', chain_path)
else: fullchain_file, abs_fullchain_path =\
chain_pem = crypto_util.dump_pyopenssl_chain(chain_cert) _open_pem_file('fullchain_path', fullchain_path)
chain_file, abs_chain_path =\ _save_chain(chain_pem, chain_file)
_open_pem_file('chain_path', chain_path) _save_chain(cert_pem + chain_pem, fullchain_file)
fullchain_file, abs_fullchain_path =\
_open_pem_file('fullchain_path', fullchain_path)
_save_chain(chain_pem, chain_file) return abs_cert_path, abs_chain_path, abs_fullchain_path
_save_chain(cert_pem + chain_pem, fullchain_file)
return abs_cert_path, abs_chain_path, abs_fullchain_path
def deploy_certificate(self, domains, privkey_path, def deploy_certificate(self, domains, privkey_path,
cert_path, chain_path, fullchain_path): cert_path, chain_path, fullchain_path):

View File

@@ -14,7 +14,6 @@ import six
import zope.component import zope.component
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
from cryptography import x509 from cryptography import x509
import josepy as jose
from acme import crypto_util as acme_crypto_util from acme import crypto_util as acme_crypto_util
@@ -367,16 +366,7 @@ def dump_pyopenssl_chain(chain, filetype=OpenSSL.crypto.FILETYPE_PEM):
""" """
# XXX: returns empty string when no chain is available, which # XXX: returns empty string when no chain is available, which
# shuts up RenewableCert, but might not be the best solution... # shuts up RenewableCert, but might not be the best solution...
return acme_crypto_util.dump_pyopenssl_chain(chain, filetype)
def _dump_cert(cert):
if isinstance(cert, jose.ComparableX509):
# pylint: disable=protected-access
cert = cert.wrapped
return OpenSSL.crypto.dump_certificate(filetype, cert)
# assumes that OpenSSL.crypto.dump_certificate includes ending
# newline character
return b"".join(_dump_cert(cert) for cert in chain)
def notBefore(cert_path): def notBefore(cert_path):
@@ -443,3 +433,16 @@ def sha256sum(filename):
with open(filename, 'rb') as f: with open(filename, 'rb') as f:
sha256.update(f.read()) sha256.update(f.read())
return sha256.hexdigest() return sha256.hexdigest()
def cert_and_chain_from_fullchain(fullchain_pem):
"""Split fullchain_pem into cert_pem and chain_pem
:param str fullchain_pem: concatenated cert + chain
:returns: tuple of string cert_pem and chain_pem
:rtype: tuple
"""
cert = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, fullchain_pem))
chain = fullchain_pem[len(cert):]
return (cert, chain)

View File

@@ -1064,13 +1064,13 @@ def _csr_get_and_save_cert(config, le_client):
""" """
csr, _ = config.actual_csr csr, _ = config.actual_csr
certr, chain = le_client.obtain_certificate_from_csr(csr) cert, chain = le_client.obtain_certificate_from_csr(csr)
if config.dry_run: if config.dry_run:
logger.debug( logger.debug(
"Dry run: skipping saving certificate to %s", config.cert_path) "Dry run: skipping saving certificate to %s", config.cert_path)
return None, None return None, None
cert_path, _, fullchain_path = le_client.save_certificate( cert_path, _, fullchain_path = le_client.save_certificate(
certr, chain, config.cert_path, config.chain_path, config.fullchain_path) cert, chain, config.cert_path, config.chain_path, config.fullchain_path)
return cert_path, fullchain_path return cert_path, fullchain_path
def renew_cert(config, plugins, lineage): def renew_cert(config, plugins, lineage):

View File

@@ -294,15 +294,12 @@ def renew_cert(config, domains, le_client, lineage):
_avoid_invalidating_lineage(config, lineage, original_server) _avoid_invalidating_lineage(config, lineage, original_server)
if not domains: if not domains:
domains = lineage.names() domains = lineage.names()
new_certr, new_chain, new_key, _ = le_client.obtain_certificate(domains) new_cert, new_chain, new_key, _ = le_client.obtain_certificate(domains)
if config.dry_run: if config.dry_run:
logger.debug("Dry run: skipping updating lineage at %s", logger.debug("Dry run: skipping updating lineage at %s",
os.path.dirname(lineage.cert)) os.path.dirname(lineage.cert))
else: else:
prior_version = lineage.latest_common_version() prior_version = lineage.latest_common_version()
new_cert = OpenSSL.crypto.dump_certificate(
OpenSSL.crypto.FILETYPE_PEM, new_certr.body.wrapped)
new_chain = crypto_util.dump_pyopenssl_chain(new_chain)
# TODO: Check return value of save_successor # TODO: Check return value of save_successor
lineage.save_successor(prior_version, new_cert, new_key.pem, new_chain, config) lineage.save_successor(prior_version, new_cert, new_key.pem, new_chain, config)
lineage.update_all_links_to(lineage.latest_common_version()) lineage.update_all_links_to(lineage.latest_common_version())

View File

@@ -4,12 +4,8 @@ import shutil
import tempfile import tempfile
import unittest import unittest
import josepy as jose
import OpenSSL
import mock import mock
from acme import errors as acme_errors
from certbot import account from certbot import account
from certbot import errors from certbot import errors
from certbot import util from certbot import util
@@ -134,7 +130,10 @@ class ClientTest(ClientTestCommon):
self.config.allow_subset_of_names = False self.config.allow_subset_of_names = False
self.config.dry_run = False self.config.dry_run = False
self.eg_domains = ["example.com", "www.example.com"] self.eg_domains = ["example.com", "www.example.com"]
self.eg_order = mock.MagicMock(authorizations=[None]) self.eg_order = mock.MagicMock(
authorizations=[None],
fullchain_pem=mock.sentinel.fullchain_pem,
csr_pem=mock.sentinel.csr_pem)
def test_init_acme_verify_ssl(self): def test_init_acme_verify_ssl(self):
net = self.acme_client.call_args[0][0] net = self.acme_client.call_args[0][0]
@@ -143,9 +142,9 @@ class ClientTest(ClientTestCommon):
def _mock_obtain_certificate(self): def _mock_obtain_certificate(self):
self.client.auth_handler = mock.MagicMock() self.client.auth_handler = mock.MagicMock()
self.client.auth_handler.handle_authorizations.return_value = [None] self.client.auth_handler.handle_authorizations.return_value = [None]
self.acme.request_issuance.return_value = mock.sentinel.certr self.acme.finalize_order.return_value = self.eg_order
self.acme.fetch_chain.return_value = mock.sentinel.chain
self.acme.new_order.return_value = self.eg_order self.acme.new_order.return_value = self.eg_order
self.eg_order.update.return_value = self.eg_order
def _check_obtain_certificate(self, auth_count=1): def _check_obtain_certificate(self, auth_count=1):
if auth_count == 1: if auth_count == 1:
@@ -155,27 +154,24 @@ class ClientTest(ClientTestCommon):
else: else:
self.assertEqual(self.client.auth_handler.handle_authorizations.call_count, auth_count) self.assertEqual(self.client.auth_handler.handle_authorizations.call_count, auth_count)
authzr = self.client.auth_handler.handle_authorizations() self.acme.finalize_order.assert_called_once_with(
self.eg_order, mock.ANY)
self.acme.request_issuance.assert_called_once_with(
jose.ComparableX509(OpenSSL.crypto.load_certificate_request(
OpenSSL.crypto.FILETYPE_PEM, CSR_SAN)),
authzr)
self.acme.fetch_chain.assert_called_once_with(mock.sentinel.certr)
@mock.patch("certbot.client.crypto_util")
@mock.patch("certbot.client.logger") @mock.patch("certbot.client.logger")
@test_util.patch_get_utility() @test_util.patch_get_utility()
def test_obtain_certificate_from_csr(self, unused_mock_get_utility, def test_obtain_certificate_from_csr(self, unused_mock_get_utility,
mock_logger): mock_logger, mock_crypto_util):
self._mock_obtain_certificate() self._mock_obtain_certificate()
test_csr = util.CSR(form="pem", file=None, data=CSR_SAN) test_csr = util.CSR(form="pem", file=None, data=CSR_SAN)
auth_handler = self.client.auth_handler auth_handler = self.client.auth_handler
mock_crypto_util.cert_and_chain_from_fullchain.return_value = (mock.sentinel.cert,
mock.sentinel.chain)
orderr = self.acme.new_order(test_csr.data) orderr = self.acme.new_order(test_csr.data)
auth_handler.handle_authorizations(orderr, False) auth_handler.handle_authorizations(orderr, False)
self.assertEqual( self.assertEqual(
(mock.sentinel.certr, mock.sentinel.chain), (mock.sentinel.cert, mock.sentinel.chain),
self.client.obtain_certificate_from_csr( self.client.obtain_certificate_from_csr(
test_csr, test_csr,
orderr=orderr)) orderr=orderr))
@@ -184,7 +180,7 @@ class ClientTest(ClientTestCommon):
# Test for orderr=None # Test for orderr=None
self.assertEqual( self.assertEqual(
(mock.sentinel.certr, mock.sentinel.chain), (mock.sentinel.cert, mock.sentinel.chain),
self.client.obtain_certificate_from_csr( self.client.obtain_certificate_from_csr(
test_csr, test_csr,
orderr=None)) orderr=None))
@@ -198,41 +194,13 @@ class ClientTest(ClientTestCommon):
test_csr) test_csr)
mock_logger.warning.assert_called_once_with(mock.ANY) mock_logger.warning.assert_called_once_with(mock.ANY)
@test_util.patch_get_utility()
def test_obtain_certificate_from_csr_retry_succeeded(
self, mock_get_utility):
self._mock_obtain_certificate()
self.acme.fetch_chain.side_effect = [acme_errors.Error,
mock.sentinel.chain]
test_csr = util.CSR(form="der", file=None, data=CSR_SAN)
orderr = self.acme.new_order(test_csr.data)
self.assertEqual(
(mock.sentinel.certr, mock.sentinel.chain),
self.client.obtain_certificate_from_csr(
test_csr,
orderr=orderr))
self.assertEqual(1, mock_get_utility().notification.call_count)
@test_util.patch_get_utility()
def test_obtain_certificate_from_csr_retry_failed(self, mock_get_utility):
self._mock_obtain_certificate()
self.acme.fetch_chain.side_effect = acme_errors.Error
test_csr = util.CSR(form="der", file=None, data=CSR_SAN)
orderr = self.acme.new_order(test_csr.data)
self.assertRaises(
acme_errors.Error,
self.client.obtain_certificate_from_csr,
test_csr,
orderr=orderr)
self.assertEqual(1, mock_get_utility().notification.call_count)
@mock.patch("certbot.client.crypto_util") @mock.patch("certbot.client.crypto_util")
def test_obtain_certificate(self, mock_crypto_util): def test_obtain_certificate(self, mock_crypto_util):
csr = util.CSR(form="pem", file=None, data=CSR_SAN) csr = util.CSR(form="pem", file=None, data=CSR_SAN)
mock_crypto_util.init_save_csr.return_value = csr mock_crypto_util.init_save_csr.return_value = csr
mock_crypto_util.init_save_key.return_value = mock.sentinel.key mock_crypto_util.init_save_key.return_value = mock.sentinel.key
mock_crypto_util.cert_and_chain_from_fullchain.return_value = (mock.sentinel.cert,
mock.sentinel.chain)
self._test_obtain_certificate_common(mock.sentinel.key, csr) self._test_obtain_certificate_common(mock.sentinel.key, csr)
@@ -240,6 +208,8 @@ class ClientTest(ClientTestCommon):
self.config.rsa_key_size, self.config.key_dir) self.config.rsa_key_size, self.config.key_dir)
mock_crypto_util.init_save_csr.assert_called_once_with( mock_crypto_util.init_save_csr.assert_called_once_with(
mock.sentinel.key, self.eg_domains, self.config.csr_dir) mock.sentinel.key, self.eg_domains, self.config.csr_dir)
mock_crypto_util.cert_and_chain_from_fullchain.assert_called_once_with(
mock.sentinel.fullchain_pem)
@mock.patch("certbot.client.crypto_util") @mock.patch("certbot.client.crypto_util")
@mock.patch("os.remove") @mock.patch("os.remove")
@@ -248,6 +218,8 @@ class ClientTest(ClientTestCommon):
key = util.CSR(form="pem", file=mock.sentinel.key_file, data=CSR_SAN) key = util.CSR(form="pem", file=mock.sentinel.key_file, data=CSR_SAN)
mock_crypto_util.init_save_csr.return_value = csr mock_crypto_util.init_save_csr.return_value = csr
mock_crypto_util.init_save_key.return_value = key mock_crypto_util.init_save_key.return_value = key
mock_crypto_util.cert_and_chain_from_fullchain.return_value = (mock.sentinel.cert,
mock.sentinel.chain)
authzr = self._authzr_from_domains(["example.com"]) authzr = self._authzr_from_domains(["example.com"])
self._test_obtain_certificate_common(key, csr, authzr_ret=authzr, auth_count=2) self._test_obtain_certificate_common(key, csr, authzr_ret=authzr, auth_count=2)
@@ -255,6 +227,7 @@ class ClientTest(ClientTestCommon):
self.assertEqual(mock_crypto_util.init_save_key.call_count, 2) self.assertEqual(mock_crypto_util.init_save_key.call_count, 2)
self.assertEqual(mock_crypto_util.init_save_csr.call_count, 2) self.assertEqual(mock_crypto_util.init_save_csr.call_count, 2)
self.assertEqual(mock_remove.call_count, 2) self.assertEqual(mock_remove.call_count, 2)
self.assertEqual(mock_crypto_util.cert_and_chain_from_fullchain.call_count, 1)
@mock.patch("certbot.client.crypto_util") @mock.patch("certbot.client.crypto_util")
@mock.patch("certbot.client.acme_crypto_util") @mock.patch("certbot.client.acme_crypto_util")
@@ -263,6 +236,8 @@ class ClientTest(ClientTestCommon):
mock_acme_crypto.make_csr.return_value = CSR_SAN mock_acme_crypto.make_csr.return_value = CSR_SAN
mock_crypto.make_key.return_value = mock.sentinel.key_pem mock_crypto.make_key.return_value = mock.sentinel.key_pem
key = util.Key(file=None, pem=mock.sentinel.key_pem) key = util.Key(file=None, pem=mock.sentinel.key_pem)
mock_crypto.cert_and_chain_from_fullchain.return_value = (mock.sentinel.cert,
mock.sentinel.chain)
self.client.config.dry_run = True self.client.config.dry_run = True
self._test_obtain_certificate_common(key, csr) self._test_obtain_certificate_common(key, csr)
@@ -272,6 +247,7 @@ class ClientTest(ClientTestCommon):
mock.sentinel.key_pem, self.eg_domains, self.config.must_staple) mock.sentinel.key_pem, self.eg_domains, self.config.must_staple)
mock_crypto.init_save_key.assert_not_called() mock_crypto.init_save_key.assert_not_called()
mock_crypto.init_save_csr.assert_not_called() mock_crypto.init_save_csr.assert_not_called()
self.assertEqual(mock_crypto.cert_and_chain_from_fullchain.call_count, 1)
def _authzr_from_domains(self, domains): def _authzr_from_domains(self, domains):
authzr = [] authzr = []
@@ -294,7 +270,6 @@ class ClientTest(ClientTestCommon):
authzr = authzr_ret or self._authzr_from_domains(self.eg_domains) authzr = authzr_ret or self._authzr_from_domains(self.eg_domains)
self.eg_order.authorizations = authzr self.eg_order.authorizations = authzr
self.eg_order.update().authorizations = authzr
self.client.auth_handler.handle_authorizations.return_value = authzr self.client.auth_handler.handle_authorizations.return_value = authzr
with test_util.patch_get_utility(): with test_util.patch_get_utility():
@@ -302,13 +277,12 @@ class ClientTest(ClientTestCommon):
self.assertEqual( self.assertEqual(
result, result,
(mock.sentinel.certr, mock.sentinel.chain, key, csr)) (mock.sentinel.cert, mock.sentinel.chain, key, csr))
self._check_obtain_certificate(auth_count) self._check_obtain_certificate(auth_count)
@mock.patch('certbot.client.Client.obtain_certificate') @mock.patch('certbot.client.Client.obtain_certificate')
@mock.patch('certbot.storage.RenewableCert.new_lineage') @mock.patch('certbot.storage.RenewableCert.new_lineage')
@mock.patch('OpenSSL.crypto.dump_certificate') def test_obtain_and_enroll_certificate(self,
def test_obtain_and_enroll_certificate(self, mock_dump_certificate,
mock_storage, mock_obtain_certificate): mock_storage, mock_obtain_certificate):
domains = ["example.com", "www.example.com"] domains = ["example.com", "www.example.com"]
mock_obtain_certificate.return_value = (mock.MagicMock(), mock_obtain_certificate.return_value = (mock.MagicMock(),
@@ -324,7 +298,6 @@ class ClientTest(ClientTestCommon):
self.assertFalse(self.client.obtain_and_enroll_certificate(domains, None)) self.assertFalse(self.client.obtain_and_enroll_certificate(domains, None))
self.assertTrue(mock_storage.call_count == 2) self.assertTrue(mock_storage.call_count == 2)
self.assertTrue(mock_dump_certificate.call_count == 2)
@mock.patch("certbot.cli.helpful_parser") @mock.patch("certbot.cli.helpful_parser")
def test_save_certificate(self, mock_parser): def test_save_certificate(self, mock_parser):
@@ -333,9 +306,8 @@ class ClientTest(ClientTestCommon):
tmp_path = tempfile.mkdtemp() tmp_path = tempfile.mkdtemp()
os.chmod(tmp_path, 0o755) # TODO: really?? os.chmod(tmp_path, 0o755) # TODO: really??
certr = mock.MagicMock(body=test_util.load_comparable_cert(certs[0])) cert_pem = test_util.load_vector(certs[0])
chain_cert = [test_util.load_comparable_cert(certs[0]), chain_pem = (test_util.load_vector(certs[0]) + test_util.load_vector(certs[1]))
test_util.load_comparable_cert(certs[1])]
candidate_cert_path = os.path.join(tmp_path, "certs", "cert_512.pem") candidate_cert_path = os.path.join(tmp_path, "certs", "cert_512.pem")
candidate_chain_path = os.path.join(tmp_path, "chains", "chain.pem") candidate_chain_path = os.path.join(tmp_path, "chains", "chain.pem")
candidate_fullchain_path = os.path.join(tmp_path, "chains", "fullchain.pem") candidate_fullchain_path = os.path.join(tmp_path, "chains", "fullchain.pem")
@@ -345,7 +317,7 @@ class ClientTest(ClientTestCommon):
"--fullchain-path", candidate_fullchain_path] "--fullchain-path", candidate_fullchain_path]
cert_path, chain_path, fullchain_path = self.client.save_certificate( cert_path, chain_path, fullchain_path = self.client.save_certificate(
certr, chain_cert, candidate_cert_path, candidate_chain_path, cert_pem, chain_pem, candidate_cert_path, candidate_chain_path,
candidate_fullchain_path) candidate_fullchain_path)
self.assertEqual(os.path.dirname(cert_path), self.assertEqual(os.path.dirname(cert_path),

View File

@@ -373,5 +373,18 @@ class Sha256sumTest(unittest.TestCase):
'914ffed8daf9e2c99d90ac95c77d54f32cbd556672facac380f0c063498df84e') '914ffed8daf9e2c99d90ac95c77d54f32cbd556672facac380f0c063498df84e')
class CertAndChainFromFullchainTest(unittest.TestCase):
"""Tests for certbot.crypto_util.cert_and_chain_from_fullchain"""
def test_cert_and_chain_from_fullchain(self):
cert_pem = CERT
chain_pem = CERT + SS_CERT
fullchain_pem = cert_pem + chain_pem
from certbot.crypto_util import cert_and_chain_from_fullchain
cert_out, chain_out = cert_and_chain_from_fullchain(fullchain_pem)
self.assertEqual(cert_out, cert_pem)
self.assertEqual(chain_out, chain_pem)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() # pragma: no cover unittest.main() # pragma: no cover

View File

@@ -57,11 +57,6 @@ def load_cert(*names):
return OpenSSL.crypto.load_certificate(loader, load_vector(*names)) return OpenSSL.crypto.load_certificate(loader, load_vector(*names))
def load_comparable_cert(*names):
"""Load ComparableX509 cert."""
return jose.ComparableX509(load_cert(*names))
def load_csr(*names): def load_csr(*names):
"""Load certificate request.""" """Load certificate request."""
loader = _guess_loader( loader = _guess_loader(