Compare commits
21 Commits
test-drop-
...
detect-acm
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c6911a4679 | ||
|
|
78f3dffb38 | ||
|
|
1f91acc173 | ||
|
|
f44f052159 | ||
|
|
dd4a3f3970 | ||
|
|
3bbae19044 | ||
|
|
3b4551e5b1 | ||
|
|
25605a15a4 | ||
|
|
64c2eccadc | ||
|
|
2cd132be04 | ||
|
|
b3c9ec3ae3 | ||
|
|
e29ca46422 | ||
|
|
f2fb173359 | ||
|
|
9646d37a85 | ||
|
|
3d662e99ad | ||
|
|
b0e85dea12 | ||
|
|
b242005ed7 | ||
|
|
c0245ee6d8 | ||
|
|
3280e17d24 | ||
|
|
3ca4f66138 | ||
|
|
0bbf2ad55a |
@@ -558,6 +558,62 @@ class ClientV2(ClientBase):
|
||||
return self._regr_from_response(response)
|
||||
|
||||
|
||||
class BackwardsCompatibleClientV2(object):
|
||||
"""ACME client wrapper that tends towards V2-style calls, but
|
||||
supports V1 servers.
|
||||
|
||||
:ivar int acme_version: 1 or 2, corresponding to the Let's Encrypt endpoint
|
||||
:ivar .ClientBase client: either Client or ClientV2
|
||||
"""
|
||||
|
||||
def __init__(self, net, key, server):
|
||||
directory = messages.Directory.from_json(net.get(server).json())
|
||||
self.acme_version = self._acme_version_from_directory(directory)
|
||||
if self.acme_version == 1:
|
||||
self.client = Client(directory, key=key, net=net)
|
||||
else:
|
||||
self.client = ClientV2(directory, net=net)
|
||||
|
||||
def __getattr__(self, name):
|
||||
if name in vars(self.client):
|
||||
return getattr(self.client, name)
|
||||
elif name in dir(ClientBase):
|
||||
return getattr(self.client, name)
|
||||
# temporary, for breaking changes into smaller pieces
|
||||
elif name in dir(Client):
|
||||
return getattr(self.client, name)
|
||||
else:
|
||||
raise AttributeError()
|
||||
|
||||
def new_account_and_tos(self, regr, check_tos_cb=None):
|
||||
"""Combined register and agree_tos for V1, new_account for V2
|
||||
|
||||
:param .NewRegistration regr:
|
||||
:param callable check_tos_cb: callback that raises an error if
|
||||
the check does not work
|
||||
"""
|
||||
def _assess_tos(tos):
|
||||
if check_tos_cb is not None:
|
||||
check_tos_cb(tos)
|
||||
if self.acme_version == 1:
|
||||
regr = self.client.register(regr)
|
||||
if regr.terms_of_service is not None:
|
||||
_assess_tos(regr.terms_of_service)
|
||||
return self.client.agree_to_tos(regr)
|
||||
return regr
|
||||
else:
|
||||
if "terms_of_service" in self.client.directory.meta:
|
||||
_assess_tos(self.client.directory.meta.terms_of_service)
|
||||
regr = regr.update(terms_of_service_agreed=True)
|
||||
return self.client.new_account(regr)
|
||||
|
||||
def _acme_version_from_directory(self, directory):
|
||||
if hasattr(directory, 'newNonce'):
|
||||
return 2
|
||||
else:
|
||||
return 1
|
||||
|
||||
|
||||
class ClientNetwork(object): # pylint: disable=too-many-instance-attributes
|
||||
"""Wrapper around requests that signs POSTs for authentication.
|
||||
|
||||
|
||||
@@ -21,10 +21,25 @@ CERT_DER = test_util.load_vector('cert.der')
|
||||
KEY = jose.JWKRSA.load(test_util.load_vector('rsa512_key.pem'))
|
||||
KEY2 = jose.JWKRSA.load(test_util.load_vector('rsa256_key.pem'))
|
||||
|
||||
DIRECTORY_V1 = messages.Directory({
|
||||
messages.NewRegistration:
|
||||
'https://www.letsencrypt-demo.org/acme/new-reg',
|
||||
messages.Revocation:
|
||||
'https://www.letsencrypt-demo.org/acme/revoke-cert',
|
||||
messages.NewAuthorization:
|
||||
'https://www.letsencrypt-demo.org/acme/new-authz',
|
||||
messages.CertificateRequest:
|
||||
'https://www.letsencrypt-demo.org/acme/new-cert',
|
||||
})
|
||||
|
||||
class ClientTest(unittest.TestCase):
|
||||
"""Tests for acme.client.Client."""
|
||||
# pylint: disable=too-many-instance-attributes,too-many-public-methods
|
||||
DIRECTORY_V2 = messages.Directory({
|
||||
'newAccount': 'https://www.letsencrypt-demo.org/acme/new-account',
|
||||
'newNonce': 'https://www.letsencrypt-demo.org/acme/new-nonce'
|
||||
})
|
||||
|
||||
|
||||
class ClientTestBase(unittest.TestCase):
|
||||
"""Base for tests in acme.client."""
|
||||
|
||||
def setUp(self):
|
||||
self.response = mock.MagicMock(
|
||||
@@ -33,21 +48,6 @@ class ClientTest(unittest.TestCase):
|
||||
self.net.post.return_value = self.response
|
||||
self.net.get.return_value = self.response
|
||||
|
||||
self.directory = messages.Directory({
|
||||
messages.NewRegistration:
|
||||
'https://www.letsencrypt-demo.org/acme/new-reg',
|
||||
messages.Revocation:
|
||||
'https://www.letsencrypt-demo.org/acme/revoke-cert',
|
||||
messages.NewAuthorization:
|
||||
'https://www.letsencrypt-demo.org/acme/new-authz',
|
||||
messages.CertificateRequest:
|
||||
'https://www.letsencrypt-demo.org/acme/new-cert',
|
||||
})
|
||||
|
||||
from acme.client import Client
|
||||
self.client = Client(
|
||||
directory=self.directory, key=KEY, alg=jose.RS256, net=self.net)
|
||||
|
||||
self.identifier = messages.Identifier(
|
||||
typ=messages.IDENTIFIER_FQDN, value='example.com')
|
||||
|
||||
@@ -84,6 +84,124 @@ class ClientTest(unittest.TestCase):
|
||||
# Reason code for revocation
|
||||
self.rsn = 1
|
||||
|
||||
|
||||
class BackwardsCompatibleClientV2Test(ClientTestBase):
|
||||
"""Tests for acme.client.BackwardsCompatibleClientV2."""
|
||||
|
||||
def _init(self):
|
||||
uri = 'http://www.letsencrypt-demo.org/directory'
|
||||
from acme.client import BackwardsCompatibleClientV2
|
||||
return BackwardsCompatibleClientV2(net=self.net,
|
||||
key=KEY, server=uri)
|
||||
|
||||
def test_init_downloads_directory(self):
|
||||
uri = 'http://www.letsencrypt-demo.org/directory'
|
||||
from acme.client import BackwardsCompatibleClientV2
|
||||
BackwardsCompatibleClientV2(net=self.net,
|
||||
key=KEY, server=uri)
|
||||
self.net.get.assert_called_once_with(uri)
|
||||
|
||||
def test_init_acme_version(self):
|
||||
self.response.json.return_value = DIRECTORY_V1.to_json()
|
||||
client = self._init()
|
||||
self.assertEqual(client.acme_version, 1)
|
||||
|
||||
self.response.json.return_value = DIRECTORY_V2.to_json()
|
||||
client = self._init()
|
||||
self.assertEqual(client.acme_version, 2)
|
||||
|
||||
def test_forwarding(self):
|
||||
self.response.json.return_value = DIRECTORY_V1.to_json()
|
||||
client = self._init()
|
||||
self.assertEqual(client.directory, client.client.directory)
|
||||
self.assertEqual(client.key, KEY)
|
||||
# delete this line once we finish migrating to new API:
|
||||
self.assertEqual(client.register, client.client.register)
|
||||
self.assertEqual(client.update_registration, client.client.update_registration)
|
||||
self.assertRaises(AttributeError, client.__getattr__, 'nonexistent')
|
||||
self.assertRaises(AttributeError, client.__getattr__, 'new_account_and_tos')
|
||||
self.assertRaises(AttributeError, client.__getattr__, 'new_account')
|
||||
|
||||
def test_new_account_and_tos(self):
|
||||
# v2 no tos
|
||||
self.response.json.return_value = DIRECTORY_V2.to_json()
|
||||
with mock.patch('acme.client.ClientV2') as mock_client:
|
||||
client = self._init()
|
||||
client.new_account_and_tos(self.new_reg)
|
||||
mock_client().new_account.assert_called_with(self.new_reg)
|
||||
|
||||
# v2 tos good
|
||||
with mock.patch('acme.client.ClientV2') as mock_client:
|
||||
mock_client().directory.meta.__contains__.return_value = True
|
||||
client = self._init()
|
||||
client.new_account_and_tos(self.new_reg, lambda x: True)
|
||||
mock_client().new_account.assert_called_with(
|
||||
self.new_reg.update(terms_of_service_agreed=True))
|
||||
|
||||
# v2 tos bad
|
||||
with mock.patch('acme.client.ClientV2') as mock_client:
|
||||
mock_client().directory.meta.__contains__.return_value = True
|
||||
client = self._init()
|
||||
def _tos_cb(tos):
|
||||
raise errors.Error
|
||||
self.assertRaises(errors.Error, client.new_account_and_tos,
|
||||
self.new_reg, _tos_cb)
|
||||
mock_client().new_account.assert_not_called()
|
||||
|
||||
# v1 yes tos
|
||||
self.response.json.return_value = DIRECTORY_V1.to_json()
|
||||
with mock.patch('acme.client.Client') as mock_client:
|
||||
regr = mock.MagicMock(terms_of_service="TOS")
|
||||
mock_client().register.return_value = regr
|
||||
client = self._init()
|
||||
client.new_account_and_tos(self.new_reg)
|
||||
mock_client().register.assert_called_once_with(self.new_reg)
|
||||
mock_client().agree_to_tos.assert_called_once_with(regr)
|
||||
|
||||
# v1 no tos
|
||||
with mock.patch('acme.client.Client') as mock_client:
|
||||
regr = mock.MagicMock(terms_of_service=None)
|
||||
mock_client().register.return_value = regr
|
||||
client = self._init()
|
||||
client.new_account_and_tos(self.new_reg)
|
||||
mock_client().register.assert_called_once_with(self.new_reg)
|
||||
mock_client().agree_to_tos.assert_not_called()
|
||||
|
||||
|
||||
class ClientV2Test(ClientTestBase):
|
||||
"""Tests for acme.client.ClientV2."""
|
||||
# pylint: disable=too-many-instance-attributes,too-many-public-methods
|
||||
|
||||
def setUp(self):
|
||||
super(ClientV2Test, self).setUp()
|
||||
from acme.client import ClientV2
|
||||
self.directory = DIRECTORY_V2
|
||||
self.client = ClientV2(directory=self.directory, net=self.net)
|
||||
|
||||
def test_new_account_v2(self):
|
||||
self.response.status_code = http_client.CREATED
|
||||
self.response.json.return_value = self.regr.body.to_json()
|
||||
self.response.headers['Location'] = self.regr.uri
|
||||
|
||||
self.regr = messages.RegistrationResource(
|
||||
body=messages.Registration(
|
||||
contact=self.contact, key=KEY.public_key()),
|
||||
uri='https://www.letsencrypt-demo.org/acme/reg/1')
|
||||
|
||||
self.assertEqual(self.regr, self.client.new_account(self.regr))
|
||||
|
||||
|
||||
class ClientTest(ClientTestBase):
|
||||
"""Tests for acme.client.Client."""
|
||||
# pylint: disable=too-many-instance-attributes,too-many-public-methods
|
||||
|
||||
def setUp(self):
|
||||
super(ClientTest, self).setUp()
|
||||
from acme.client import Client
|
||||
self.directory = DIRECTORY_V1
|
||||
self.client = Client(
|
||||
directory=self.directory, key=KEY, alg=jose.RS256, net=self.net)
|
||||
|
||||
def test_init_downloads_directory(self):
|
||||
uri = 'http://www.letsencrypt-demo.org/directory'
|
||||
from acme.client import Client
|
||||
@@ -104,23 +222,6 @@ class ClientTest(unittest.TestCase):
|
||||
self.assertEqual(self.regr, self.client.register(self.new_reg))
|
||||
# TODO: test POST call arguments
|
||||
|
||||
def test_new_account_v2(self):
|
||||
directory = messages.Directory({
|
||||
"newAccount": 'https://www.letsencrypt-demo.org/acme/new-account',
|
||||
})
|
||||
from acme.client import ClientV2
|
||||
client = ClientV2(directory, self.net)
|
||||
self.response.status_code = http_client.CREATED
|
||||
self.response.json.return_value = self.regr.body.to_json()
|
||||
self.response.headers['Location'] = self.regr.uri
|
||||
|
||||
self.regr = messages.RegistrationResource(
|
||||
body=messages.Registration(
|
||||
contact=self.contact, key=KEY.public_key()),
|
||||
uri='https://www.letsencrypt-demo.org/acme/reg/1')
|
||||
|
||||
self.assertEqual(self.regr, client.new_account(self.regr))
|
||||
|
||||
def test_update_registration(self):
|
||||
# "Instance of 'Field' has no to_json/update member" bug:
|
||||
# pylint: disable=no-member
|
||||
|
||||
@@ -171,10 +171,31 @@ class Directory(jose.JSONDeSerializable):
|
||||
|
||||
class Meta(jose.JSONObjectWithFields):
|
||||
"""Directory Meta."""
|
||||
terms_of_service = jose.Field('terms-of-service', omitempty=True)
|
||||
_terms_of_service = jose.Field('terms-of-service', omitempty=True)
|
||||
_terms_of_service_v2 = jose.Field('termsOfService', omitempty=True)
|
||||
website = jose.Field('website', omitempty=True)
|
||||
caa_identities = jose.Field('caa-identities', omitempty=True)
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
kwargs = dict((self._internal_name(k), v) for k, v in kwargs.items())
|
||||
# pylint: disable=star-args
|
||||
super(Directory.Meta, self).__init__(**kwargs)
|
||||
|
||||
@property
|
||||
def terms_of_service(self):
|
||||
"""URL for the CA TOS"""
|
||||
return self._terms_of_service or self._terms_of_service_v2
|
||||
|
||||
def __iter__(self):
|
||||
# When iterating over fields, use the external name 'terms_of_service' instead of
|
||||
# the internal '_terms_of_service'.
|
||||
for name in super(Directory.Meta, self).__iter__():
|
||||
yield name[1:] if name == '_terms_of_service' else name
|
||||
|
||||
def _internal_name(self, name):
|
||||
return '_' + name if name == 'terms_of_service' else name
|
||||
|
||||
|
||||
@classmethod
|
||||
def _canon_key(cls, key):
|
||||
return getattr(key, 'resource_type', key)
|
||||
@@ -251,7 +272,7 @@ class Registration(ResourceBody):
|
||||
contact = jose.Field('contact', omitempty=True, default=())
|
||||
agreement = jose.Field('agreement', omitempty=True)
|
||||
status = jose.Field('status', omitempty=True)
|
||||
terms_of_service_agreed = jose.Field('terms-of-service-agreed', omitempty=True)
|
||||
terms_of_service_agreed = jose.Field('termsOfServiceAgreed', omitempty=True)
|
||||
|
||||
phone_prefix = 'tel:'
|
||||
email_prefix = 'mailto:'
|
||||
|
||||
@@ -165,6 +165,13 @@ class DirectoryTest(unittest.TestCase):
|
||||
from acme.messages import Directory
|
||||
Directory.from_json({'foo': 'bar'})
|
||||
|
||||
def test_iter_meta(self):
|
||||
result = False
|
||||
for k in self.dir.meta:
|
||||
if k == 'terms_of_service':
|
||||
result = self.dir.meta[k] == 'https://example.com/acme/terms'
|
||||
self.assertTrue(result)
|
||||
|
||||
|
||||
class RegistrationTest(unittest.TestCase):
|
||||
"""Tests for acme.messages.Registration."""
|
||||
|
||||
@@ -37,12 +37,12 @@ from certbot.plugins import selection as plugin_selection
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def acme_from_config_key(config, key):
|
||||
def acme_from_config_key(config, key, regr=None):
|
||||
"Wrangle ACME client construction"
|
||||
# TODO: Allow for other alg types besides RS256
|
||||
net = acme_client.ClientNetwork(key, verify_ssl=(not config.no_verify_ssl),
|
||||
net = acme_client.ClientNetwork(key, account=regr, verify_ssl=(not config.no_verify_ssl),
|
||||
user_agent=determine_user_agent(config))
|
||||
return acme_client.Client(config.server, key=key, net=net)
|
||||
return acme_client.BackwardsCompatibleClientV2(net, key, config.server)
|
||||
|
||||
|
||||
def determine_user_agent(config):
|
||||
@@ -162,14 +162,7 @@ def register(config, account_storage, tos_cb=None):
|
||||
backend=default_backend())))
|
||||
acme = acme_from_config_key(config, key)
|
||||
# TODO: add phone?
|
||||
regr = perform_registration(acme, config)
|
||||
|
||||
if regr.terms_of_service is not None:
|
||||
if tos_cb is not None and not tos_cb(regr):
|
||||
raise errors.Error(
|
||||
"Registration cannot proceed without accepting "
|
||||
"Terms of Service.")
|
||||
regr = acme.agree_to_tos(regr)
|
||||
regr = perform_registration(acme, config, tos_cb)
|
||||
|
||||
acc = account.Account(regr, key)
|
||||
account.report_new_account(config)
|
||||
@@ -180,7 +173,7 @@ def register(config, account_storage, tos_cb=None):
|
||||
return acc, acme
|
||||
|
||||
|
||||
def perform_registration(acme, config):
|
||||
def perform_registration(acme, config, tos_cb):
|
||||
"""
|
||||
Actually register new account, trying repeatedly if there are email
|
||||
problems
|
||||
@@ -192,7 +185,8 @@ def perform_registration(acme, config):
|
||||
:rtype: `acme.messages.RegistrationResource`
|
||||
"""
|
||||
try:
|
||||
return acme.register(messages.NewRegistration.from_data(email=config.email))
|
||||
return acme.new_account_and_tos(messages.NewRegistration.from_data(email=config.email),
|
||||
tos_cb)
|
||||
except messages.Error as e:
|
||||
if e.code == "invalidEmail" or e.code == "invalidContact":
|
||||
if config.noninteractive_mode:
|
||||
@@ -202,7 +196,7 @@ def perform_registration(acme, config):
|
||||
raise errors.Error(msg)
|
||||
else:
|
||||
config.email = display_ops.get_email(invalid=True)
|
||||
return perform_registration(acme, config)
|
||||
return perform_registration(acme, config, tos_cb)
|
||||
else:
|
||||
raise
|
||||
|
||||
@@ -232,7 +226,7 @@ class Client(object):
|
||||
|
||||
# Initialize ACME if account is provided
|
||||
if acme is None and self.account is not None:
|
||||
acme = acme_from_config_key(config, self.account.key)
|
||||
acme = acme_from_config_key(config, self.account.key, self.account.regr)
|
||||
self.acme = acme
|
||||
|
||||
if auth is not None:
|
||||
|
||||
@@ -495,17 +495,20 @@ def _determine_account(config):
|
||||
if config.email is None and not config.register_unsafely_without_email:
|
||||
config.email = display_ops.get_email()
|
||||
|
||||
def _tos_cb(regr):
|
||||
def _tos_cb(terms_of_service):
|
||||
if config.tos:
|
||||
return True
|
||||
msg = ("Please read the Terms of Service at {0}. You "
|
||||
"must agree in order to register with the ACME "
|
||||
"server at {1}".format(
|
||||
regr.terms_of_service, config.server))
|
||||
terms_of_service, config.server))
|
||||
obj = zope.component.getUtility(interfaces.IDisplay)
|
||||
return obj.yesno(msg, "Agree", "Cancel",
|
||||
result = obj.yesno(msg, "Agree", "Cancel",
|
||||
cli_flag="--agree-tos", force_interactive=True)
|
||||
|
||||
if not result:
|
||||
raise errors.Error(
|
||||
"Registration cannot proceed without accepting "
|
||||
"Terms of Service.")
|
||||
try:
|
||||
acc, acme = client.register(
|
||||
config, account_storage, tos_cb=_tos_cb)
|
||||
|
||||
@@ -30,31 +30,27 @@ class RegisterTest(test_util.ConfigTestCase):
|
||||
self.config.register_unsafely_without_email = False
|
||||
self.config.email = "alias@example.com"
|
||||
self.account_storage = account.AccountMemoryStorage()
|
||||
self.tos_cb = mock.MagicMock()
|
||||
|
||||
def _call(self):
|
||||
from certbot.client import register
|
||||
return register(self.config, self.account_storage, self.tos_cb)
|
||||
tos_cb = mock.MagicMock()
|
||||
return register(self.config, self.account_storage, tos_cb)
|
||||
|
||||
def test_no_tos(self):
|
||||
with mock.patch("certbot.client.acme_client.Client") as mock_client:
|
||||
mock_client.register().terms_of_service = "http://tos"
|
||||
with mock.patch("certbot.client.acme_client.BackwardsCompatibleClientV2") as mock_client:
|
||||
mock_client.new_account_and_tos().terms_of_service = "http://tos"
|
||||
with mock.patch("certbot.eff.handle_subscription") as mock_handle:
|
||||
with mock.patch("certbot.account.report_new_account"):
|
||||
self.tos_cb.return_value = False
|
||||
mock_client().new_account_and_tos.side_effect = errors.Error
|
||||
self.assertRaises(errors.Error, self._call)
|
||||
self.assertFalse(mock_handle.called)
|
||||
|
||||
self.tos_cb.return_value = True
|
||||
mock_client().new_account_and_tos.side_effect = None
|
||||
self._call()
|
||||
self.assertTrue(mock_handle.called)
|
||||
|
||||
self.tos_cb = None
|
||||
self._call()
|
||||
self.assertEqual(mock_handle.call_count, 2)
|
||||
|
||||
def test_it(self):
|
||||
with mock.patch("certbot.client.acme_client.Client"):
|
||||
with mock.patch("certbot.client.acme_client.BackwardsCompatibleClientV2"):
|
||||
with mock.patch("certbot.account.report_new_account"):
|
||||
with mock.patch("certbot.eff.handle_subscription"):
|
||||
self._call()
|
||||
@@ -66,9 +62,9 @@ class RegisterTest(test_util.ConfigTestCase):
|
||||
self.config.noninteractive_mode = False
|
||||
msg = "DNS problem: NXDOMAIN looking up MX for example.com"
|
||||
mx_err = messages.Error.with_code('invalidContact', detail=msg)
|
||||
with mock.patch("certbot.client.acme_client.Client") as mock_client:
|
||||
with mock.patch("certbot.client.acme_client.BackwardsCompatibleClientV2") as mock_client:
|
||||
with mock.patch("certbot.eff.handle_subscription") as mock_handle:
|
||||
mock_client().register.side_effect = [mx_err, mock.MagicMock()]
|
||||
mock_client().new_account_and_tos.side_effect = [mx_err, mock.MagicMock()]
|
||||
self._call()
|
||||
self.assertEqual(mock_get_email.call_count, 1)
|
||||
self.assertTrue(mock_handle.called)
|
||||
@@ -79,9 +75,9 @@ class RegisterTest(test_util.ConfigTestCase):
|
||||
self.config.noninteractive_mode = True
|
||||
msg = "DNS problem: NXDOMAIN looking up MX for example.com"
|
||||
mx_err = messages.Error.with_code('invalidContact', detail=msg)
|
||||
with mock.patch("certbot.client.acme_client.Client") as mock_client:
|
||||
with mock.patch("certbot.client.acme_client.BackwardsCompatibleClientV2") as mock_client:
|
||||
with mock.patch("certbot.eff.handle_subscription"):
|
||||
mock_client().register.side_effect = [mx_err, mock.MagicMock()]
|
||||
mock_client().new_account_and_tos.side_effect = [mx_err, mock.MagicMock()]
|
||||
self.assertRaises(errors.Error, self._call)
|
||||
|
||||
def test_needs_email(self):
|
||||
@@ -91,7 +87,7 @@ class RegisterTest(test_util.ConfigTestCase):
|
||||
@mock.patch("certbot.client.logger")
|
||||
def test_without_email(self, mock_logger):
|
||||
with mock.patch("certbot.eff.handle_subscription") as mock_handle:
|
||||
with mock.patch("certbot.client.acme_client.Client"):
|
||||
with mock.patch("certbot.client.acme_client.BackwardsCompatibleClientV2"):
|
||||
with mock.patch("certbot.account.report_new_account"):
|
||||
self.config.email = None
|
||||
self.config.register_unsafely_without_email = True
|
||||
@@ -104,9 +100,9 @@ class RegisterTest(test_util.ConfigTestCase):
|
||||
from acme import messages
|
||||
msg = "Test"
|
||||
mx_err = messages.Error(detail=msg, typ="malformed", title="title")
|
||||
with mock.patch("certbot.client.acme_client.Client") as mock_client:
|
||||
with mock.patch("certbot.client.acme_client.BackwardsCompatibleClientV2") as mock_client:
|
||||
with mock.patch("certbot.eff.handle_subscription") as mock_handle:
|
||||
mock_client().register.side_effect = [mx_err, mock.MagicMock()]
|
||||
mock_client().new_account_and_tos.side_effect = [mx_err, mock.MagicMock()]
|
||||
self.assertRaises(messages.Error, self._call)
|
||||
self.assertFalse(mock_handle.called)
|
||||
|
||||
@@ -122,7 +118,7 @@ class ClientTestCommon(test_util.ConfigTestCase):
|
||||
self.account = mock.MagicMock(**{"key.pem": KEY})
|
||||
|
||||
from certbot.client import Client
|
||||
with mock.patch("certbot.client.acme_client.Client") as acme:
|
||||
with mock.patch("certbot.client.acme_client.BackwardsCompatibleClientV2") as acme:
|
||||
self.acme_client = acme
|
||||
self.acme = acme.return_value = mock.MagicMock()
|
||||
self.client = Client(
|
||||
@@ -140,7 +136,7 @@ class ClientTest(ClientTestCommon):
|
||||
self.eg_domains = ["example.com", "www.example.com"]
|
||||
|
||||
def test_init_acme_verify_ssl(self):
|
||||
net = self.acme_client.call_args[1]["net"]
|
||||
net = self.acme_client.call_args[0][0]
|
||||
self.assertTrue(net.verify_ssl)
|
||||
|
||||
def _mock_obtain_certificate(self):
|
||||
|
||||
@@ -225,7 +225,7 @@ class RevokeTest(test_util.TempDirTestCase):
|
||||
'cert_512.pem'))
|
||||
|
||||
self.patches = [
|
||||
mock.patch('acme.client.Client', autospec=True),
|
||||
mock.patch('acme.client.BackwardsCompatibleClientV2'),
|
||||
mock.patch('certbot.client.Client'),
|
||||
mock.patch('certbot.main._determine_account'),
|
||||
mock.patch('certbot.main.display_ops.success_revocation')
|
||||
@@ -267,7 +267,7 @@ class RevokeTest(test_util.TempDirTestCase):
|
||||
def test_revoke_with_reason(self, mock_acme_client,
|
||||
mock_delete_if_appropriate):
|
||||
mock_delete_if_appropriate.return_value = False
|
||||
mock_revoke = mock_acme_client.Client().revoke
|
||||
mock_revoke = mock_acme_client.BackwardsCompatibleClientV2().revoke
|
||||
expected = []
|
||||
for reason, code in constants.REVOCATION_REASONS.items():
|
||||
self._call("--reason " + reason)
|
||||
@@ -642,10 +642,6 @@ class MainTest(test_util.ConfigTestCase): # pylint: disable=too-many-public-met
|
||||
self._cli_missing_flag(args, "specify a plugin")
|
||||
args.extend(['--standalone', '-d', 'eg.is'])
|
||||
self._cli_missing_flag(args, "register before running")
|
||||
with mock.patch('certbot.main._get_and_save_cert'):
|
||||
with mock.patch('certbot.main.client.acme_from_config_key'):
|
||||
args.extend(['--email', 'io@io.is'])
|
||||
self._cli_missing_flag(args, "--agree-tos")
|
||||
|
||||
@mock.patch('certbot.main._report_new_cert')
|
||||
@mock.patch('certbot.main.client.acme_client.Client')
|
||||
@@ -674,7 +670,8 @@ class MainTest(test_util.ConfigTestCase): # pylint: disable=too-many-public-met
|
||||
ua = "bandersnatch"
|
||||
args += ["--user-agent", ua]
|
||||
self._call_no_clientmock(args)
|
||||
acme_net.assert_called_once_with(mock.ANY, verify_ssl=True, user_agent=ua)
|
||||
acme_net.assert_called_once_with(mock.ANY, account=mock.ANY, verify_ssl=True,
|
||||
user_agent=ua)
|
||||
|
||||
@mock.patch('certbot.main.plug_sel.record_chosen_plugins')
|
||||
@mock.patch('certbot.main.plug_sel.pick_installer')
|
||||
@@ -1263,11 +1260,11 @@ class MainTest(test_util.ConfigTestCase): # pylint: disable=too-many-public-met
|
||||
self._call_no_clientmock(['--cert-path', SS_CERT_PATH, '--key-path', RSA2048_KEY_PATH,
|
||||
'--server', server, 'revoke'])
|
||||
with open(RSA2048_KEY_PATH, 'rb') as f:
|
||||
mock_acme_client.Client.assert_called_once_with(
|
||||
server, key=jose.JWK.load(f.read()), net=mock.ANY)
|
||||
mock_acme_client.BackwardsCompatibleClientV2.assert_called_once_with(
|
||||
mock.ANY, jose.JWK.load(f.read()), server)
|
||||
with open(SS_CERT_PATH, 'rb') as f:
|
||||
cert = crypto_util.pyopenssl_load_certificate(f.read())[0]
|
||||
mock_revoke = mock_acme_client.Client().revoke
|
||||
mock_revoke = mock_acme_client.BackwardsCompatibleClientV2().revoke
|
||||
mock_revoke.assert_called_once_with(
|
||||
jose.ComparableX509(cert),
|
||||
mock.ANY)
|
||||
|
||||
@@ -340,7 +340,7 @@ class ConfigTestCase(TempDirTestCase):
|
||||
self.config.cert_path = constants.CLI_DEFAULTS['auth_cert_path']
|
||||
self.config.fullchain_path = constants.CLI_DEFAULTS['auth_chain_path']
|
||||
self.config.chain_path = constants.CLI_DEFAULTS['auth_chain_path']
|
||||
self.config.server = "example.com"
|
||||
self.config.server = "https://example.com"
|
||||
|
||||
def lock_and_call(func, lock_path):
|
||||
"""Grab a lock for lock_path and call func.
|
||||
|
||||
Reference in New Issue
Block a user