Compare commits
17 Commits
update-dev
...
finalize_s
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8468039b5d | ||
|
|
e2a091d7d6 | ||
|
|
1400dbb44f | ||
|
|
461f2d4c6a | ||
|
|
54b5991875 | ||
|
|
dc0684b3e6 | ||
|
|
7bcb54d107 | ||
|
|
66d6b15b95 | ||
|
|
c34cbb4a62 | ||
|
|
bdaeb2aedf | ||
|
|
f2dbcd0edf | ||
|
|
095b1e059d | ||
|
|
074a2e9218 | ||
|
|
c066a4d796 | ||
|
|
ff1ba9b8c6 | ||
|
|
0a67e1a4e8 | ||
|
|
1f7b35e320 |
@@ -677,9 +677,6 @@ class BackwardsCompatibleClientV2(object):
|
||||
return getattr(self.client, name)
|
||||
elif name in dir(ClientBase):
|
||||
return getattr(self.client, name)
|
||||
# temporary, for breaking changes into smaller pieces
|
||||
elif name in dir(Client):
|
||||
return getattr(self.client, name)
|
||||
else:
|
||||
raise AttributeError()
|
||||
|
||||
@@ -723,10 +720,48 @@ class BackwardsCompatibleClientV2(object):
|
||||
authorizations = []
|
||||
for domain in dnsNames:
|
||||
authorizations.append(self.client.request_domain_challenges(domain))
|
||||
return messages.OrderResource(authorizations=authorizations)
|
||||
return messages.OrderResource(authorizations=authorizations, csr_pem=csr_pem)
|
||||
else:
|
||||
return self.client.new_order(csr_pem)
|
||||
|
||||
def finalize_order(self, orderr, deadline):
|
||||
"""Finalize an order and obtain a certificate.
|
||||
|
||||
:param messages.OrderResource orderr: order to finalize
|
||||
:param datetime.datetime deadline: when to stop polling and timeout
|
||||
|
||||
:returns: finalized order
|
||||
:rtype: messages.OrderResource
|
||||
|
||||
"""
|
||||
if self.acme_version == 1:
|
||||
csr_pem = orderr.csr_pem
|
||||
certr = self.client.request_issuance(
|
||||
jose.ComparableX509(
|
||||
OpenSSL.crypto.load_certificate_request(OpenSSL.crypto.FILETYPE_PEM, csr_pem)),
|
||||
orderr.authorizations)
|
||||
|
||||
chain = None
|
||||
while datetime.datetime.now() < deadline:
|
||||
try:
|
||||
chain = self.client.fetch_chain(certr)
|
||||
break
|
||||
except errors.Error:
|
||||
time.sleep(1)
|
||||
|
||||
if chain is None:
|
||||
raise errors.TimeoutError(
|
||||
'Failed to fetch chain. You should not deploy the generated '
|
||||
'certificate, please rerun the command for a new one.')
|
||||
|
||||
cert = OpenSSL.crypto.dump_certificate(
|
||||
OpenSSL.crypto.FILETYPE_PEM, certr.body.wrapped)
|
||||
chain = crypto_util.dump_pyopenssl_chain(chain)
|
||||
|
||||
return orderr.update(fullchain_pem=(cert + chain))
|
||||
else:
|
||||
return self.client.finalize_order(orderr, deadline)
|
||||
|
||||
def _acme_version_from_directory(self, directory):
|
||||
if hasattr(directory, 'newNonce'):
|
||||
return 2
|
||||
|
||||
@@ -8,6 +8,7 @@ from six.moves import http_client # pylint: disable=import-error
|
||||
|
||||
import josepy as jose
|
||||
import mock
|
||||
import OpenSSL
|
||||
import requests
|
||||
|
||||
from acme import challenges
|
||||
@@ -82,6 +83,29 @@ class ClientTestBase(unittest.TestCase):
|
||||
class BackwardsCompatibleClientV2Test(ClientTestBase):
|
||||
"""Tests for acme.client.BackwardsCompatibleClientV2."""
|
||||
|
||||
def setUp(self):
|
||||
super(BackwardsCompatibleClientV2Test, self).setUp()
|
||||
# contains a loaded cert
|
||||
self.certr = messages.CertificateResource(
|
||||
body=messages_test.CERT)
|
||||
|
||||
loaded = OpenSSL.crypto.load_certificate(
|
||||
OpenSSL.crypto.FILETYPE_PEM, CERT_SAN_PEM)
|
||||
wrapped = jose.ComparableX509(loaded)
|
||||
self.chain = [wrapped, wrapped]
|
||||
|
||||
self.cert_pem = OpenSSL.crypto.dump_certificate(
|
||||
OpenSSL.crypto.FILETYPE_PEM, messages_test.CERT.wrapped)
|
||||
|
||||
single_chain = OpenSSL.crypto.dump_certificate(
|
||||
OpenSSL.crypto.FILETYPE_PEM, loaded)
|
||||
self.chain_pem = single_chain + single_chain
|
||||
|
||||
self.fullchain_pem = self.cert_pem + self.chain_pem
|
||||
|
||||
self.orderr = messages.OrderResource(
|
||||
csr_pem=CSR_SAN_PEM)
|
||||
|
||||
def _init(self):
|
||||
uri = 'http://www.letsencrypt-demo.org/directory'
|
||||
from acme.client import BackwardsCompatibleClientV2
|
||||
@@ -109,8 +133,6 @@ class BackwardsCompatibleClientV2Test(ClientTestBase):
|
||||
client = self._init()
|
||||
self.assertEqual(client.directory, client.client.directory)
|
||||
self.assertEqual(client.key, KEY)
|
||||
# delete this line once we finish migrating to new API:
|
||||
self.assertEqual(client.register, client.client.register)
|
||||
self.assertEqual(client.update_registration, client.client.update_registration)
|
||||
self.assertRaises(AttributeError, client.__getattr__, 'nonexistent')
|
||||
self.assertRaises(AttributeError, client.__getattr__, 'new_account_and_tos')
|
||||
@@ -182,7 +204,52 @@ class BackwardsCompatibleClientV2Test(ClientTestBase):
|
||||
client.new_order(mock_csr_pem)
|
||||
mock_client().new_order.assert_called_once_with(mock_csr_pem)
|
||||
|
||||
@mock.patch('acme.client.Client')
|
||||
def test_finalize_order_v1_success(self, mock_client):
|
||||
self.response.json.return_value = DIRECTORY_V1.to_json()
|
||||
|
||||
mock_client().request_issuance.return_value = self.certr
|
||||
mock_client().fetch_chain.return_value = self.chain
|
||||
|
||||
deadline = datetime.datetime(9999, 9, 9)
|
||||
client = self._init()
|
||||
result = client.finalize_order(self.orderr, deadline)
|
||||
self.assertEqual(result.fullchain_pem, self.fullchain_pem)
|
||||
mock_client().fetch_chain.assert_called_once_with(self.certr)
|
||||
|
||||
@mock.patch('acme.client.Client')
|
||||
def test_finalize_order_v1_fetch_chain_error(self, mock_client):
|
||||
self.response.json.return_value = DIRECTORY_V1.to_json()
|
||||
|
||||
mock_client().request_issuance.return_value = self.certr
|
||||
mock_client().fetch_chain.return_value = self.chain
|
||||
mock_client().fetch_chain.side_effect = [errors.Error, self.chain]
|
||||
|
||||
deadline = datetime.datetime(9999, 9, 9)
|
||||
client = self._init()
|
||||
result = client.finalize_order(self.orderr, deadline)
|
||||
self.assertEqual(result.fullchain_pem, self.fullchain_pem)
|
||||
self.assertEqual(mock_client().fetch_chain.call_count, 2)
|
||||
|
||||
@mock.patch('acme.client.Client')
|
||||
def test_finalize_order_v1_timeout(self, mock_client):
|
||||
self.response.json.return_value = DIRECTORY_V1.to_json()
|
||||
|
||||
mock_client().request_issuance.return_value = self.certr
|
||||
|
||||
deadline = deadline = datetime.datetime.now() - datetime.timedelta(seconds=60)
|
||||
client = self._init()
|
||||
self.assertRaises(errors.TimeoutError, client.finalize_order,
|
||||
self.orderr, deadline)
|
||||
|
||||
def test_finalize_order_v2(self):
|
||||
self.response.json.return_value = DIRECTORY_V2.to_json()
|
||||
mock_orderr = mock.MagicMock()
|
||||
mock_deadline = mock.MagicMock()
|
||||
with mock.patch('acme.client.ClientV2') as mock_client:
|
||||
client = self._init()
|
||||
client.finalize_order(mock_orderr, mock_deadline)
|
||||
mock_client().finalize_order.assert_called_once_with(mock_orderr, mock_deadline)
|
||||
|
||||
|
||||
class ClientTest(ClientTestBase):
|
||||
|
||||
@@ -8,6 +8,8 @@ import socket
|
||||
import sys
|
||||
|
||||
import OpenSSL
|
||||
import josepy as jose
|
||||
|
||||
|
||||
from acme import errors
|
||||
|
||||
@@ -280,3 +282,23 @@ def gen_ss_cert(key, domains, not_before=None,
|
||||
cert.set_pubkey(key)
|
||||
cert.sign(key, "sha256")
|
||||
return cert
|
||||
|
||||
def dump_pyopenssl_chain(chain, filetype=OpenSSL.crypto.FILETYPE_PEM):
|
||||
"""Dump certificate chain into a bundle.
|
||||
|
||||
:param list chain: List of `OpenSSL.crypto.X509` (or wrapped in
|
||||
:class:`josepy.util.ComparableX509`).
|
||||
|
||||
"""
|
||||
# XXX: returns empty string when no chain is available, which
|
||||
# shuts up RenewableCert, but might not be the best solution...
|
||||
|
||||
def _dump_cert(cert):
|
||||
if isinstance(cert, jose.ComparableX509):
|
||||
# pylint: disable=protected-access
|
||||
cert = cert.wrapped
|
||||
return OpenSSL.crypto.dump_certificate(filetype, cert)
|
||||
|
||||
# assumes that OpenSSL.crypto.dump_certificate includes ending
|
||||
# newline character
|
||||
return b"".join(_dump_cert(cert) for cert in chain)
|
||||
|
||||
@@ -225,5 +225,33 @@ class MakeCSRTest(unittest.TestCase):
|
||||
self.assertEqual(len(must_staple_exts), 1,
|
||||
"Expected exactly one Must Staple extension")
|
||||
|
||||
|
||||
class DumpPyopensslChainTest(unittest.TestCase):
|
||||
"""Test for dump_pyopenssl_chain."""
|
||||
|
||||
@classmethod
|
||||
def _call(cls, loaded):
|
||||
# pylint: disable=protected-access
|
||||
from acme.crypto_util import dump_pyopenssl_chain
|
||||
return dump_pyopenssl_chain(loaded)
|
||||
|
||||
def test_dump_pyopenssl_chain(self):
|
||||
names = ['cert.pem', 'cert-san.pem', 'cert-idnsans.pem']
|
||||
loaded = [test_util.load_cert(name) for name in names]
|
||||
length = sum(
|
||||
len(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert))
|
||||
for cert in loaded)
|
||||
self.assertEqual(len(self._call(loaded)), length)
|
||||
|
||||
def test_dump_pyopenssl_chain_wrapped(self):
|
||||
names = ['cert.pem', 'cert-san.pem', 'cert-idnsans.pem']
|
||||
loaded = [test_util.load_cert(name) for name in names]
|
||||
wrap_func = jose.ComparableX509
|
||||
wrapped = [wrap_func(cert) for cert in loaded]
|
||||
dump_func = OpenSSL.crypto.dump_certificate
|
||||
length = sum(len(dump_func(OpenSSL.crypto.FILETYPE_PEM, cert)) for cert in loaded)
|
||||
self.assertEqual(len(self._call(wrapped)), length)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main() # pragma: no cover
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
"""Certbot client API."""
|
||||
import datetime
|
||||
import logging
|
||||
import os
|
||||
import platform
|
||||
@@ -11,7 +12,6 @@ import zope.component
|
||||
|
||||
from acme import client as acme_client
|
||||
from acme import crypto_util as acme_crypto_util
|
||||
from acme import errors as acme_errors
|
||||
from acme import messages
|
||||
|
||||
import certbot
|
||||
@@ -243,8 +243,7 @@ class Client(object):
|
||||
than `authkey`.
|
||||
:param acme.messages.OrderResource orderr: contains authzrs
|
||||
|
||||
:returns: `.CertificateResource` and certificate chain (as
|
||||
returned by `.fetch_chain`).
|
||||
:returns: certificate and chain as PEM strings
|
||||
:rtype: tuple
|
||||
|
||||
"""
|
||||
@@ -264,32 +263,9 @@ class Client(object):
|
||||
orderr = orderr.update(authorizations=authzr)
|
||||
authzr = orderr.authorizations
|
||||
|
||||
certr = self.acme.request_issuance(
|
||||
jose.ComparableX509(
|
||||
OpenSSL.crypto.load_certificate_request(OpenSSL.crypto.FILETYPE_PEM, csr.data)),
|
||||
authzr)
|
||||
|
||||
notify = zope.component.getUtility(interfaces.IDisplay).notification
|
||||
retries = 0
|
||||
chain = None
|
||||
|
||||
while retries <= 1:
|
||||
if retries:
|
||||
notify('Failed to fetch chain, please check your network '
|
||||
'and continue', pause=True)
|
||||
try:
|
||||
chain = self.acme.fetch_chain(certr)
|
||||
break
|
||||
except acme_errors.Error:
|
||||
logger.debug('Failed to fetch chain', exc_info=True)
|
||||
retries += 1
|
||||
|
||||
if chain is None:
|
||||
raise acme_errors.Error(
|
||||
'Failed to fetch chain. You should not deploy the generated '
|
||||
'certificate, please rerun the command for a new one.')
|
||||
|
||||
return certr, chain
|
||||
deadline = datetime.datetime.now() + datetime.timedelta(seconds=90)
|
||||
orderr = self.acme.finalize_order(orderr, deadline)
|
||||
return crypto_util.cert_and_chain_from_fullchain(orderr.fullchain_pem)
|
||||
|
||||
def obtain_certificate(self, domains):
|
||||
"""Obtains a certificate from the ACME server.
|
||||
@@ -298,10 +274,9 @@ class Client(object):
|
||||
|
||||
:param list domains: domains to get a certificate
|
||||
|
||||
:returns: `.CertificateResource`, certificate chain (as
|
||||
returned by `.fetch_chain`), and newly generated private key
|
||||
(`.util.Key`) and DER-encoded Certificate Signing Request
|
||||
(`.util.CSR`).
|
||||
:returns: :returns: certificate as PEM string, chain as PEM string,
|
||||
newly generated private key (`.util.Key`), and DER-encoded
|
||||
Certificate Signing Request (`.util.CSR`).
|
||||
:rtype: tuple
|
||||
|
||||
"""
|
||||
@@ -329,9 +304,9 @@ class Client(object):
|
||||
os.remove(csr.file)
|
||||
return self.obtain_certificate(successful_domains)
|
||||
else:
|
||||
certr, chain = self.obtain_certificate_from_csr(csr, orderr)
|
||||
cert, chain = self.obtain_certificate_from_csr(csr, orderr)
|
||||
|
||||
return certr, chain, key, csr
|
||||
return cert, chain, key, csr
|
||||
|
||||
# pylint: disable=no-member
|
||||
def obtain_and_enroll_certificate(self, domains, certname):
|
||||
@@ -350,7 +325,7 @@ class Client(object):
|
||||
be obtained, or None if doing a successful dry run.
|
||||
|
||||
"""
|
||||
certr, chain, key, _ = self.obtain_certificate(domains)
|
||||
cert, chain, key, _ = self.obtain_certificate(domains)
|
||||
|
||||
if (self.config.config_dir != constants.CLI_DEFAULTS["config_dir"] or
|
||||
self.config.work_dir != constants.CLI_DEFAULTS["work_dir"]):
|
||||
@@ -365,19 +340,16 @@ class Client(object):
|
||||
return None
|
||||
else:
|
||||
return storage.RenewableCert.new_lineage(
|
||||
new_name, OpenSSL.crypto.dump_certificate(
|
||||
OpenSSL.crypto.FILETYPE_PEM, certr.body.wrapped),
|
||||
key.pem, crypto_util.dump_pyopenssl_chain(chain),
|
||||
new_name, cert,
|
||||
key.pem, chain,
|
||||
self.config)
|
||||
|
||||
def save_certificate(self, certr, chain_cert,
|
||||
def save_certificate(self, cert_pem, chain_pem,
|
||||
cert_path, chain_path, fullchain_path):
|
||||
"""Saves the certificate received from the ACME server.
|
||||
|
||||
:param certr: ACME "certificate" resource.
|
||||
:type certr: :class:`acme.messages.Certificate`
|
||||
|
||||
:param list chain_cert:
|
||||
:param str cert_pem:
|
||||
:param str chain_pem:
|
||||
:param str cert_path: Candidate path to a certificate.
|
||||
:param str chain_path: Candidate path to a certificate chain.
|
||||
:param str fullchain_path: Candidate path to a full cert chain.
|
||||
@@ -394,8 +366,6 @@ class Client(object):
|
||||
os.path.dirname(path), 0o755, os.geteuid(),
|
||||
self.config.strict_permissions)
|
||||
|
||||
cert_pem = OpenSSL.crypto.dump_certificate(
|
||||
OpenSSL.crypto.FILETYPE_PEM, certr.body.wrapped)
|
||||
|
||||
cert_file, abs_cert_path = _open_pem_file('cert_path', cert_path)
|
||||
|
||||
@@ -406,20 +376,15 @@ class Client(object):
|
||||
logger.info("Server issued certificate; certificate written to %s",
|
||||
abs_cert_path)
|
||||
|
||||
if not chain_cert:
|
||||
return abs_cert_path, None, None
|
||||
else:
|
||||
chain_pem = crypto_util.dump_pyopenssl_chain(chain_cert)
|
||||
chain_file, abs_chain_path =\
|
||||
_open_pem_file('chain_path', chain_path)
|
||||
fullchain_file, abs_fullchain_path =\
|
||||
_open_pem_file('fullchain_path', fullchain_path)
|
||||
|
||||
chain_file, abs_chain_path =\
|
||||
_open_pem_file('chain_path', chain_path)
|
||||
fullchain_file, abs_fullchain_path =\
|
||||
_open_pem_file('fullchain_path', fullchain_path)
|
||||
_save_chain(chain_pem, chain_file)
|
||||
_save_chain(cert_pem + chain_pem, fullchain_file)
|
||||
|
||||
_save_chain(chain_pem, chain_file)
|
||||
_save_chain(cert_pem + chain_pem, fullchain_file)
|
||||
|
||||
return abs_cert_path, abs_chain_path, abs_fullchain_path
|
||||
return abs_cert_path, abs_chain_path, abs_fullchain_path
|
||||
|
||||
def deploy_certificate(self, domains, privkey_path,
|
||||
cert_path, chain_path, fullchain_path):
|
||||
|
||||
@@ -14,7 +14,6 @@ import six
|
||||
import zope.component
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography import x509
|
||||
import josepy as jose
|
||||
|
||||
from acme import crypto_util as acme_crypto_util
|
||||
|
||||
@@ -367,16 +366,7 @@ def dump_pyopenssl_chain(chain, filetype=OpenSSL.crypto.FILETYPE_PEM):
|
||||
"""
|
||||
# XXX: returns empty string when no chain is available, which
|
||||
# shuts up RenewableCert, but might not be the best solution...
|
||||
|
||||
def _dump_cert(cert):
|
||||
if isinstance(cert, jose.ComparableX509):
|
||||
# pylint: disable=protected-access
|
||||
cert = cert.wrapped
|
||||
return OpenSSL.crypto.dump_certificate(filetype, cert)
|
||||
|
||||
# assumes that OpenSSL.crypto.dump_certificate includes ending
|
||||
# newline character
|
||||
return b"".join(_dump_cert(cert) for cert in chain)
|
||||
return acme_crypto_util.dump_pyopenssl_chain(chain, filetype)
|
||||
|
||||
|
||||
def notBefore(cert_path):
|
||||
@@ -443,3 +433,16 @@ def sha256sum(filename):
|
||||
with open(filename, 'rb') as f:
|
||||
sha256.update(f.read())
|
||||
return sha256.hexdigest()
|
||||
|
||||
def cert_and_chain_from_fullchain(fullchain_pem):
|
||||
"""Split fullchain_pem into cert_pem and chain_pem
|
||||
|
||||
:param str fullchain_pem: concatenated cert + chain
|
||||
|
||||
:returns: tuple of string cert_pem and chain_pem
|
||||
:rtype: tuple
|
||||
"""
|
||||
cert = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
|
||||
OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, fullchain_pem))
|
||||
chain = fullchain_pem[len(cert):]
|
||||
return (cert, chain)
|
||||
|
||||
@@ -1064,13 +1064,13 @@ def _csr_get_and_save_cert(config, le_client):
|
||||
|
||||
"""
|
||||
csr, _ = config.actual_csr
|
||||
certr, chain = le_client.obtain_certificate_from_csr(csr)
|
||||
cert, chain = le_client.obtain_certificate_from_csr(csr)
|
||||
if config.dry_run:
|
||||
logger.debug(
|
||||
"Dry run: skipping saving certificate to %s", config.cert_path)
|
||||
return None, None
|
||||
cert_path, _, fullchain_path = le_client.save_certificate(
|
||||
certr, chain, config.cert_path, config.chain_path, config.fullchain_path)
|
||||
cert, chain, config.cert_path, config.chain_path, config.fullchain_path)
|
||||
return cert_path, fullchain_path
|
||||
|
||||
def renew_cert(config, plugins, lineage):
|
||||
|
||||
@@ -294,15 +294,12 @@ def renew_cert(config, domains, le_client, lineage):
|
||||
_avoid_invalidating_lineage(config, lineage, original_server)
|
||||
if not domains:
|
||||
domains = lineage.names()
|
||||
new_certr, new_chain, new_key, _ = le_client.obtain_certificate(domains)
|
||||
new_cert, new_chain, new_key, _ = le_client.obtain_certificate(domains)
|
||||
if config.dry_run:
|
||||
logger.debug("Dry run: skipping updating lineage at %s",
|
||||
os.path.dirname(lineage.cert))
|
||||
else:
|
||||
prior_version = lineage.latest_common_version()
|
||||
new_cert = OpenSSL.crypto.dump_certificate(
|
||||
OpenSSL.crypto.FILETYPE_PEM, new_certr.body.wrapped)
|
||||
new_chain = crypto_util.dump_pyopenssl_chain(new_chain)
|
||||
# TODO: Check return value of save_successor
|
||||
lineage.save_successor(prior_version, new_cert, new_key.pem, new_chain, config)
|
||||
lineage.update_all_links_to(lineage.latest_common_version())
|
||||
|
||||
@@ -4,12 +4,8 @@ import shutil
|
||||
import tempfile
|
||||
import unittest
|
||||
|
||||
import josepy as jose
|
||||
import OpenSSL
|
||||
import mock
|
||||
|
||||
from acme import errors as acme_errors
|
||||
|
||||
from certbot import account
|
||||
from certbot import errors
|
||||
from certbot import util
|
||||
@@ -134,7 +130,10 @@ class ClientTest(ClientTestCommon):
|
||||
self.config.allow_subset_of_names = False
|
||||
self.config.dry_run = False
|
||||
self.eg_domains = ["example.com", "www.example.com"]
|
||||
self.eg_order = mock.MagicMock(authorizations=[None])
|
||||
self.eg_order = mock.MagicMock(
|
||||
authorizations=[None],
|
||||
fullchain_pem=mock.sentinel.fullchain_pem,
|
||||
csr_pem=mock.sentinel.csr_pem)
|
||||
|
||||
def test_init_acme_verify_ssl(self):
|
||||
net = self.acme_client.call_args[0][0]
|
||||
@@ -143,9 +142,9 @@ class ClientTest(ClientTestCommon):
|
||||
def _mock_obtain_certificate(self):
|
||||
self.client.auth_handler = mock.MagicMock()
|
||||
self.client.auth_handler.handle_authorizations.return_value = [None]
|
||||
self.acme.request_issuance.return_value = mock.sentinel.certr
|
||||
self.acme.fetch_chain.return_value = mock.sentinel.chain
|
||||
self.acme.finalize_order.return_value = self.eg_order
|
||||
self.acme.new_order.return_value = self.eg_order
|
||||
self.eg_order.update.return_value = self.eg_order
|
||||
|
||||
def _check_obtain_certificate(self, auth_count=1):
|
||||
if auth_count == 1:
|
||||
@@ -155,27 +154,24 @@ class ClientTest(ClientTestCommon):
|
||||
else:
|
||||
self.assertEqual(self.client.auth_handler.handle_authorizations.call_count, auth_count)
|
||||
|
||||
authzr = self.client.auth_handler.handle_authorizations()
|
||||
|
||||
self.acme.request_issuance.assert_called_once_with(
|
||||
jose.ComparableX509(OpenSSL.crypto.load_certificate_request(
|
||||
OpenSSL.crypto.FILETYPE_PEM, CSR_SAN)),
|
||||
authzr)
|
||||
|
||||
self.acme.fetch_chain.assert_called_once_with(mock.sentinel.certr)
|
||||
self.acme.finalize_order.assert_called_once_with(
|
||||
self.eg_order, mock.ANY)
|
||||
|
||||
@mock.patch("certbot.client.crypto_util")
|
||||
@mock.patch("certbot.client.logger")
|
||||
@test_util.patch_get_utility()
|
||||
def test_obtain_certificate_from_csr(self, unused_mock_get_utility,
|
||||
mock_logger):
|
||||
mock_logger, mock_crypto_util):
|
||||
self._mock_obtain_certificate()
|
||||
test_csr = util.CSR(form="pem", file=None, data=CSR_SAN)
|
||||
auth_handler = self.client.auth_handler
|
||||
mock_crypto_util.cert_and_chain_from_fullchain.return_value = (mock.sentinel.cert,
|
||||
mock.sentinel.chain)
|
||||
|
||||
orderr = self.acme.new_order(test_csr.data)
|
||||
auth_handler.handle_authorizations(orderr, False)
|
||||
self.assertEqual(
|
||||
(mock.sentinel.certr, mock.sentinel.chain),
|
||||
(mock.sentinel.cert, mock.sentinel.chain),
|
||||
self.client.obtain_certificate_from_csr(
|
||||
test_csr,
|
||||
orderr=orderr))
|
||||
@@ -184,7 +180,7 @@ class ClientTest(ClientTestCommon):
|
||||
|
||||
# Test for orderr=None
|
||||
self.assertEqual(
|
||||
(mock.sentinel.certr, mock.sentinel.chain),
|
||||
(mock.sentinel.cert, mock.sentinel.chain),
|
||||
self.client.obtain_certificate_from_csr(
|
||||
test_csr,
|
||||
orderr=None))
|
||||
@@ -198,41 +194,13 @@ class ClientTest(ClientTestCommon):
|
||||
test_csr)
|
||||
mock_logger.warning.assert_called_once_with(mock.ANY)
|
||||
|
||||
@test_util.patch_get_utility()
|
||||
def test_obtain_certificate_from_csr_retry_succeeded(
|
||||
self, mock_get_utility):
|
||||
self._mock_obtain_certificate()
|
||||
self.acme.fetch_chain.side_effect = [acme_errors.Error,
|
||||
mock.sentinel.chain]
|
||||
test_csr = util.CSR(form="der", file=None, data=CSR_SAN)
|
||||
|
||||
orderr = self.acme.new_order(test_csr.data)
|
||||
self.assertEqual(
|
||||
(mock.sentinel.certr, mock.sentinel.chain),
|
||||
self.client.obtain_certificate_from_csr(
|
||||
test_csr,
|
||||
orderr=orderr))
|
||||
self.assertEqual(1, mock_get_utility().notification.call_count)
|
||||
|
||||
@test_util.patch_get_utility()
|
||||
def test_obtain_certificate_from_csr_retry_failed(self, mock_get_utility):
|
||||
self._mock_obtain_certificate()
|
||||
self.acme.fetch_chain.side_effect = acme_errors.Error
|
||||
test_csr = util.CSR(form="der", file=None, data=CSR_SAN)
|
||||
|
||||
orderr = self.acme.new_order(test_csr.data)
|
||||
self.assertRaises(
|
||||
acme_errors.Error,
|
||||
self.client.obtain_certificate_from_csr,
|
||||
test_csr,
|
||||
orderr=orderr)
|
||||
self.assertEqual(1, mock_get_utility().notification.call_count)
|
||||
|
||||
@mock.patch("certbot.client.crypto_util")
|
||||
def test_obtain_certificate(self, mock_crypto_util):
|
||||
csr = util.CSR(form="pem", file=None, data=CSR_SAN)
|
||||
mock_crypto_util.init_save_csr.return_value = csr
|
||||
mock_crypto_util.init_save_key.return_value = mock.sentinel.key
|
||||
mock_crypto_util.cert_and_chain_from_fullchain.return_value = (mock.sentinel.cert,
|
||||
mock.sentinel.chain)
|
||||
|
||||
self._test_obtain_certificate_common(mock.sentinel.key, csr)
|
||||
|
||||
@@ -240,6 +208,8 @@ class ClientTest(ClientTestCommon):
|
||||
self.config.rsa_key_size, self.config.key_dir)
|
||||
mock_crypto_util.init_save_csr.assert_called_once_with(
|
||||
mock.sentinel.key, self.eg_domains, self.config.csr_dir)
|
||||
mock_crypto_util.cert_and_chain_from_fullchain.assert_called_once_with(
|
||||
mock.sentinel.fullchain_pem)
|
||||
|
||||
@mock.patch("certbot.client.crypto_util")
|
||||
@mock.patch("os.remove")
|
||||
@@ -248,6 +218,8 @@ class ClientTest(ClientTestCommon):
|
||||
key = util.CSR(form="pem", file=mock.sentinel.key_file, data=CSR_SAN)
|
||||
mock_crypto_util.init_save_csr.return_value = csr
|
||||
mock_crypto_util.init_save_key.return_value = key
|
||||
mock_crypto_util.cert_and_chain_from_fullchain.return_value = (mock.sentinel.cert,
|
||||
mock.sentinel.chain)
|
||||
|
||||
authzr = self._authzr_from_domains(["example.com"])
|
||||
self._test_obtain_certificate_common(key, csr, authzr_ret=authzr, auth_count=2)
|
||||
@@ -255,6 +227,7 @@ class ClientTest(ClientTestCommon):
|
||||
self.assertEqual(mock_crypto_util.init_save_key.call_count, 2)
|
||||
self.assertEqual(mock_crypto_util.init_save_csr.call_count, 2)
|
||||
self.assertEqual(mock_remove.call_count, 2)
|
||||
self.assertEqual(mock_crypto_util.cert_and_chain_from_fullchain.call_count, 1)
|
||||
|
||||
@mock.patch("certbot.client.crypto_util")
|
||||
@mock.patch("certbot.client.acme_crypto_util")
|
||||
@@ -263,6 +236,8 @@ class ClientTest(ClientTestCommon):
|
||||
mock_acme_crypto.make_csr.return_value = CSR_SAN
|
||||
mock_crypto.make_key.return_value = mock.sentinel.key_pem
|
||||
key = util.Key(file=None, pem=mock.sentinel.key_pem)
|
||||
mock_crypto.cert_and_chain_from_fullchain.return_value = (mock.sentinel.cert,
|
||||
mock.sentinel.chain)
|
||||
|
||||
self.client.config.dry_run = True
|
||||
self._test_obtain_certificate_common(key, csr)
|
||||
@@ -272,6 +247,7 @@ class ClientTest(ClientTestCommon):
|
||||
mock.sentinel.key_pem, self.eg_domains, self.config.must_staple)
|
||||
mock_crypto.init_save_key.assert_not_called()
|
||||
mock_crypto.init_save_csr.assert_not_called()
|
||||
self.assertEqual(mock_crypto.cert_and_chain_from_fullchain.call_count, 1)
|
||||
|
||||
def _authzr_from_domains(self, domains):
|
||||
authzr = []
|
||||
@@ -294,7 +270,6 @@ class ClientTest(ClientTestCommon):
|
||||
authzr = authzr_ret or self._authzr_from_domains(self.eg_domains)
|
||||
|
||||
self.eg_order.authorizations = authzr
|
||||
self.eg_order.update().authorizations = authzr
|
||||
self.client.auth_handler.handle_authorizations.return_value = authzr
|
||||
|
||||
with test_util.patch_get_utility():
|
||||
@@ -302,13 +277,12 @@ class ClientTest(ClientTestCommon):
|
||||
|
||||
self.assertEqual(
|
||||
result,
|
||||
(mock.sentinel.certr, mock.sentinel.chain, key, csr))
|
||||
(mock.sentinel.cert, mock.sentinel.chain, key, csr))
|
||||
self._check_obtain_certificate(auth_count)
|
||||
|
||||
@mock.patch('certbot.client.Client.obtain_certificate')
|
||||
@mock.patch('certbot.storage.RenewableCert.new_lineage')
|
||||
@mock.patch('OpenSSL.crypto.dump_certificate')
|
||||
def test_obtain_and_enroll_certificate(self, mock_dump_certificate,
|
||||
def test_obtain_and_enroll_certificate(self,
|
||||
mock_storage, mock_obtain_certificate):
|
||||
domains = ["example.com", "www.example.com"]
|
||||
mock_obtain_certificate.return_value = (mock.MagicMock(),
|
||||
@@ -324,7 +298,6 @@ class ClientTest(ClientTestCommon):
|
||||
self.assertFalse(self.client.obtain_and_enroll_certificate(domains, None))
|
||||
|
||||
self.assertTrue(mock_storage.call_count == 2)
|
||||
self.assertTrue(mock_dump_certificate.call_count == 2)
|
||||
|
||||
@mock.patch("certbot.cli.helpful_parser")
|
||||
def test_save_certificate(self, mock_parser):
|
||||
@@ -333,9 +306,8 @@ class ClientTest(ClientTestCommon):
|
||||
tmp_path = tempfile.mkdtemp()
|
||||
os.chmod(tmp_path, 0o755) # TODO: really??
|
||||
|
||||
certr = mock.MagicMock(body=test_util.load_comparable_cert(certs[0]))
|
||||
chain_cert = [test_util.load_comparable_cert(certs[0]),
|
||||
test_util.load_comparable_cert(certs[1])]
|
||||
cert_pem = test_util.load_vector(certs[0])
|
||||
chain_pem = (test_util.load_vector(certs[0]) + test_util.load_vector(certs[1]))
|
||||
candidate_cert_path = os.path.join(tmp_path, "certs", "cert_512.pem")
|
||||
candidate_chain_path = os.path.join(tmp_path, "chains", "chain.pem")
|
||||
candidate_fullchain_path = os.path.join(tmp_path, "chains", "fullchain.pem")
|
||||
@@ -345,7 +317,7 @@ class ClientTest(ClientTestCommon):
|
||||
"--fullchain-path", candidate_fullchain_path]
|
||||
|
||||
cert_path, chain_path, fullchain_path = self.client.save_certificate(
|
||||
certr, chain_cert, candidate_cert_path, candidate_chain_path,
|
||||
cert_pem, chain_pem, candidate_cert_path, candidate_chain_path,
|
||||
candidate_fullchain_path)
|
||||
|
||||
self.assertEqual(os.path.dirname(cert_path),
|
||||
|
||||
@@ -373,5 +373,18 @@ class Sha256sumTest(unittest.TestCase):
|
||||
'914ffed8daf9e2c99d90ac95c77d54f32cbd556672facac380f0c063498df84e')
|
||||
|
||||
|
||||
class CertAndChainFromFullchainTest(unittest.TestCase):
|
||||
"""Tests for certbot.crypto_util.cert_and_chain_from_fullchain"""
|
||||
|
||||
def test_cert_and_chain_from_fullchain(self):
|
||||
cert_pem = CERT
|
||||
chain_pem = CERT + SS_CERT
|
||||
fullchain_pem = cert_pem + chain_pem
|
||||
from certbot.crypto_util import cert_and_chain_from_fullchain
|
||||
cert_out, chain_out = cert_and_chain_from_fullchain(fullchain_pem)
|
||||
self.assertEqual(cert_out, cert_pem)
|
||||
self.assertEqual(chain_out, chain_pem)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main() # pragma: no cover
|
||||
|
||||
@@ -57,11 +57,6 @@ def load_cert(*names):
|
||||
return OpenSSL.crypto.load_certificate(loader, load_vector(*names))
|
||||
|
||||
|
||||
def load_comparable_cert(*names):
|
||||
"""Load ComparableX509 cert."""
|
||||
return jose.ComparableX509(load_cert(*names))
|
||||
|
||||
|
||||
def load_csr(*names):
|
||||
"""Load certificate request."""
|
||||
loader = _guess_loader(
|
||||
|
||||
Reference in New Issue
Block a user