Compare commits
6 Commits
test-pytho
...
apache_acm
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a773f7c755 | ||
|
|
1820b426e7 | ||
|
|
fd68f741a8 | ||
|
|
8755c94aeb | ||
|
|
1b55f94e73 | ||
|
|
20af164bf1 |
@@ -5,6 +5,7 @@ import logging
|
||||
import os
|
||||
import pkg_resources
|
||||
import re
|
||||
import six
|
||||
import socket
|
||||
import time
|
||||
|
||||
@@ -152,6 +153,9 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
|
||||
self.assoc = dict()
|
||||
# Outstanding challenges
|
||||
self._chall_out = set()
|
||||
# List of vhosts configured per wildcard domain on this run.
|
||||
# used by deploy_cert() and enhance()
|
||||
self._wildcard_vhosts = dict()
|
||||
# Maps enhancements to vhosts we've enabled the enhancement for
|
||||
self._enhanced_vhosts = defaultdict(set)
|
||||
|
||||
@@ -262,6 +266,21 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
|
||||
self.aug, self.conf("server-root"), self.conf("vhost-root"),
|
||||
self.version, configurator=self)
|
||||
|
||||
def _wildcard_domain(self, domain):
|
||||
"""
|
||||
Checks if domain is a wildcard domain
|
||||
|
||||
:param str domain: Domain to check
|
||||
|
||||
:returns: If the domain is wildcard domain
|
||||
:rtype: bool
|
||||
"""
|
||||
if isinstance(domain, six.text_type):
|
||||
wildcard_marker = u"*."
|
||||
else:
|
||||
wildcard_marker = b"*."
|
||||
return domain.startswith(wildcard_marker)
|
||||
|
||||
def deploy_cert(self, domain, cert_path, key_path,
|
||||
chain_path=None, fullchain_path=None):
|
||||
"""Deploys certificate to specified virtual host.
|
||||
@@ -280,9 +299,112 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
|
||||
a lack of directives
|
||||
|
||||
"""
|
||||
# Choose vhost before (possible) enabling of mod_ssl, to keep the
|
||||
# vhost choice namespace similar with the pre-validation one.
|
||||
vhost = self.choose_vhost(domain)
|
||||
vhosts = self.choose_vhosts(domain)
|
||||
for vhost in vhosts:
|
||||
self._deploy_cert(vhost, cert_path, key_path, chain_path, fullchain_path)
|
||||
|
||||
def choose_vhosts(self, domain, create_if_no_ssl=True):
|
||||
"""
|
||||
Finds VirtualHosts that can be used with the provided domain
|
||||
|
||||
:param str domain: Domain name to match VirtualHosts to
|
||||
:param bool create_if_no_ssl: If found VirtualHost doesn't have a HTTPS
|
||||
counterpart, should one get created
|
||||
|
||||
:returns: List of VirtualHosts or None
|
||||
:rtype: `list` of :class:`~certbot_apache.obj.VirtualHost`
|
||||
"""
|
||||
|
||||
if self._wildcard_domain(domain):
|
||||
if domain in self._wildcard_vhosts:
|
||||
# Vhosts for a wildcard domain were already selected
|
||||
return self._wildcard_vhosts[domain]
|
||||
# Ask user which VHosts to support.
|
||||
# Returned objects are guaranteed to be ssl vhosts
|
||||
return self._choose_vhosts_wildcard(domain, create_if_no_ssl)
|
||||
else:
|
||||
return [self.choose_vhost(domain)]
|
||||
|
||||
def _vhosts_for_wildcard(self, domain):
|
||||
"""
|
||||
Get VHost objects for every VirtualHost that the user wants to handle
|
||||
with the wildcard certificate.
|
||||
"""
|
||||
|
||||
# Collect all vhosts that match the name
|
||||
matched = set()
|
||||
for vhost in self.vhosts:
|
||||
for name in vhost.get_names():
|
||||
if self._in_wildcard_scope(name, domain):
|
||||
matched.add(vhost)
|
||||
|
||||
return list(matched)
|
||||
|
||||
def _in_wildcard_scope(self, name, domain):
|
||||
"""
|
||||
Helper method for _vhosts_for_wildcard() that makes sure that the domain
|
||||
is in the scope of wildcard domain.
|
||||
|
||||
eg. in scope: domain = *.wild.card, name = 1.wild.card
|
||||
not in scope: domain = *.wild.card, name = 1.2.wild.card
|
||||
"""
|
||||
if len(name.split(".")) == len(domain.split(".")):
|
||||
return fnmatch.fnmatch(name, domain)
|
||||
|
||||
|
||||
def _choose_vhosts_wildcard(self, domain, create_ssl=True):
|
||||
"""Prompts user to choose vhosts to install a wildcard certificate for"""
|
||||
|
||||
# Get all vhosts that are covered by the wildcard domain
|
||||
vhosts = self._vhosts_for_wildcard(domain)
|
||||
|
||||
# Go through the vhosts, making sure that we cover all the names
|
||||
# present, but preferring the SSL vhosts
|
||||
filtered_vhosts = dict()
|
||||
for vhost in vhosts:
|
||||
for name in vhost.get_names():
|
||||
if vhost.ssl:
|
||||
# Always prefer SSL vhosts
|
||||
filtered_vhosts[name] = vhost
|
||||
elif name not in filtered_vhosts and create_ssl:
|
||||
# Add if not in list previously
|
||||
filtered_vhosts[name] = vhost
|
||||
|
||||
# Only unique VHost objects
|
||||
dialog_input = set([vhost for vhost in filtered_vhosts.values()])
|
||||
|
||||
# Ask the user which of names to enable, expect list of names back
|
||||
dialog_output = display_ops.select_vhost_multiple(list(dialog_input))
|
||||
|
||||
if not dialog_output:
|
||||
logger.error(
|
||||
"No vhost exists with servername or alias for domain %s. "
|
||||
"No vhost was selected. Please specify ServerName or ServerAlias "
|
||||
"in the Apache config.",
|
||||
domain)
|
||||
raise errors.PluginError("No vhost selected")
|
||||
|
||||
# Make sure we create SSL vhosts for the ones that are HTTP only
|
||||
# if requested.
|
||||
return_vhosts = list()
|
||||
for vhost in dialog_output:
|
||||
if not vhost.ssl:
|
||||
return_vhosts.append(self.make_vhost_ssl(vhost))
|
||||
else:
|
||||
return_vhosts.append(vhost)
|
||||
|
||||
self._wildcard_vhosts[domain] = return_vhosts
|
||||
return return_vhosts
|
||||
|
||||
|
||||
def _deploy_cert(self, vhost, cert_path, key_path, chain_path, fullchain_path):
|
||||
"""
|
||||
Helper function for deploy_cert() that handles the actual deployment
|
||||
this exists because we might want to do multiple deployments per
|
||||
domain originally passed for deploy_cert(). This is especially true
|
||||
with wildcard certificates
|
||||
"""
|
||||
|
||||
|
||||
# This is done first so that ssl module is enabled and cert_path,
|
||||
# cert_key... can all be parsed appropriately
|
||||
@@ -311,7 +433,7 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
|
||||
raise errors.PluginError(
|
||||
"Unable to find cert and/or key directives")
|
||||
|
||||
logger.info("Deploying Certificate for %s to VirtualHost %s", domain, vhost.filep)
|
||||
logger.info("Deploying Certificate to VirtualHost %s", vhost.filep)
|
||||
|
||||
if self.version < (2, 4, 8) or (chain_path and not fullchain_path):
|
||||
# install SSLCertificateFile, SSLCertificateKeyFile,
|
||||
@@ -327,8 +449,8 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
|
||||
"version of Apache")
|
||||
else:
|
||||
if not fullchain_path:
|
||||
raise errors.PluginError("Please provide the --fullchain-path\
|
||||
option pointing to your full chain file")
|
||||
raise errors.PluginError("Please provide the --fullchain-path "
|
||||
"option pointing to your full chain file")
|
||||
set_cert_path = fullchain_path
|
||||
self.aug.set(path["cert_path"][-1], fullchain_path)
|
||||
self.aug.set(path["cert_key"][-1], key_path)
|
||||
@@ -391,7 +513,7 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
|
||||
logger.error(
|
||||
"No vhost exists with servername or alias of %s. "
|
||||
"No vhost was selected. Please specify ServerName or ServerAlias "
|
||||
"in the Apache config, or split vhosts into separate files.",
|
||||
"in the Apache config.",
|
||||
target_name)
|
||||
raise errors.PluginError("No vhost selected")
|
||||
elif temp:
|
||||
@@ -1376,8 +1498,11 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
|
||||
except KeyError:
|
||||
raise errors.PluginError(
|
||||
"Unsupported enhancement: {0}".format(enhancement))
|
||||
|
||||
vhosts = self.choose_vhosts(domain, create_if_no_ssl=False)
|
||||
try:
|
||||
func(self.choose_vhost(domain), options)
|
||||
for vhost in vhosts:
|
||||
func(vhost, options)
|
||||
except errors.PluginError:
|
||||
logger.warning("Failed %s for %s", enhancement, domain)
|
||||
raise
|
||||
|
||||
@@ -13,10 +13,44 @@ import certbot.display.util as display_util
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def select_vhost_multiple(vhosts):
|
||||
"""Select multiple Vhosts to install the certificate for
|
||||
|
||||
:param vhosts: Available Apache VirtualHosts
|
||||
:type vhosts: :class:`list` of type `~obj.Vhost`
|
||||
|
||||
:returns: List of VirtualHosts
|
||||
:rtype: :class:`list`of type `~obj.Vhost`
|
||||
"""
|
||||
if not vhosts:
|
||||
return list()
|
||||
tags_list = [vhost.display_repr()+"\n" for vhost in vhosts]
|
||||
# Remove the extra newline from the last entry
|
||||
if len(tags_list):
|
||||
tags_list[-1] = tags_list[-1][:-1]
|
||||
code, names = zope.component.getUtility(interfaces.IDisplay).checklist(
|
||||
"Which VirtualHosts would you like to install the wildcard certificate for?",
|
||||
tags=tags_list, force_interactive=True)
|
||||
if code == display_util.OK:
|
||||
return_vhosts = _reversemap_vhosts(names, vhosts)
|
||||
return return_vhosts
|
||||
return []
|
||||
|
||||
def _reversemap_vhosts(names, vhosts):
|
||||
"""Helper function for select_vhost_multiple for mapping string
|
||||
representations back to actual vhost objects"""
|
||||
return_vhosts = list()
|
||||
|
||||
for selection in names:
|
||||
for vhost in vhosts:
|
||||
if vhost.display_repr().strip() == selection.strip():
|
||||
return_vhosts.append(vhost)
|
||||
return return_vhosts
|
||||
|
||||
def select_vhost(domain, vhosts):
|
||||
"""Select an appropriate Apache Vhost.
|
||||
|
||||
:param vhosts: Available Apache Virtual Hosts
|
||||
:param vhosts: Available Apache VirtualHosts
|
||||
:type vhosts: :class:`list` of type `~obj.Vhost`
|
||||
|
||||
:returns: VirtualHost or `None`
|
||||
@@ -25,13 +59,11 @@ def select_vhost(domain, vhosts):
|
||||
"""
|
||||
if not vhosts:
|
||||
return None
|
||||
while True:
|
||||
code, tag = _vhost_menu(domain, vhosts)
|
||||
if code == display_util.OK:
|
||||
return vhosts[tag]
|
||||
else:
|
||||
return None
|
||||
|
||||
code, tag = _vhost_menu(domain, vhosts)
|
||||
if code == display_util.OK:
|
||||
return vhosts[tag]
|
||||
else:
|
||||
return None
|
||||
|
||||
def _vhost_menu(domain, vhosts):
|
||||
"""Select an appropriate Apache Vhost.
|
||||
|
||||
@@ -167,6 +167,19 @@ class VirtualHost(object): # pylint: disable=too-few-public-methods
|
||||
active="Yes" if self.enabled else "No",
|
||||
modmacro="Yes" if self.modmacro else "No"))
|
||||
|
||||
def display_repr(self):
|
||||
"""Return a representation of VHost to be used in dialog"""
|
||||
return (
|
||||
"File: {filename}\n"
|
||||
"Addresses: {addrs}\n"
|
||||
"Names: {names}\n"
|
||||
"HTTPS: {https}\n".format(
|
||||
filename=self.filep,
|
||||
addrs=", ".join(str(addr) for addr in self.addrs),
|
||||
names=", ".join(self.get_names()),
|
||||
https="Yes" if self.ssl else "No"))
|
||||
|
||||
|
||||
def __eq__(self, other):
|
||||
if isinstance(other, self.__class__):
|
||||
return (self.filep == other.filep and self.path == other.path and
|
||||
|
||||
@@ -1337,6 +1337,106 @@ class MultipleVhostsTest(util.ApacheTest):
|
||||
self.config.enable_mod,
|
||||
"whatever")
|
||||
|
||||
def test_wildcard_domain(self):
|
||||
# pylint: disable=protected-access
|
||||
cases = {u"*.example.org": True, b"*.x.example.org": True,
|
||||
u"a.example.org": False, b"a.x.example.org": False}
|
||||
for key in cases.keys():
|
||||
self.assertEqual(self.config._wildcard_domain(key), cases[key])
|
||||
|
||||
def test_choose_vhosts_wildcard(self):
|
||||
# pylint: disable=protected-access
|
||||
mock_path = "certbot_apache.display_ops.select_vhost_multiple"
|
||||
with mock.patch(mock_path) as mock_select_vhs:
|
||||
mock_select_vhs.return_value = [self.vh_truth[3]]
|
||||
vhs = self.config._choose_vhosts_wildcard("*.certbot.demo",
|
||||
create_ssl=True)
|
||||
# Check that the dialog was called with one vh: certbot.demo
|
||||
self.assertEquals(mock_select_vhs.call_args[0][0][0], self.vh_truth[3])
|
||||
self.assertEquals(len(mock_select_vhs.call_args_list), 1)
|
||||
|
||||
# And the actual returned values
|
||||
self.assertEquals(len(vhs), 1)
|
||||
self.assertTrue(vhs[0].name == "certbot.demo")
|
||||
self.assertTrue(vhs[0].ssl)
|
||||
|
||||
self.assertFalse(vhs[0] == self.vh_truth[3])
|
||||
|
||||
@mock.patch("certbot_apache.configurator.ApacheConfigurator.make_vhost_ssl")
|
||||
def test_choose_vhosts_wildcard_no_ssl(self, mock_makessl):
|
||||
# pylint: disable=protected-access
|
||||
mock_path = "certbot_apache.display_ops.select_vhost_multiple"
|
||||
with mock.patch(mock_path) as mock_select_vhs:
|
||||
mock_select_vhs.return_value = [self.vh_truth[1]]
|
||||
vhs = self.config._choose_vhosts_wildcard("*.certbot.demo",
|
||||
create_ssl=False)
|
||||
self.assertFalse(mock_makessl.called)
|
||||
self.assertEquals(vhs[0], self.vh_truth[1])
|
||||
|
||||
@mock.patch("certbot_apache.configurator.ApacheConfigurator._vhosts_for_wildcard")
|
||||
@mock.patch("certbot_apache.configurator.ApacheConfigurator.make_vhost_ssl")
|
||||
def test_choose_vhosts_wildcard_already_ssl(self, mock_makessl, mock_vh_for_w):
|
||||
# pylint: disable=protected-access
|
||||
# Already SSL vhost
|
||||
mock_vh_for_w.return_value = [self.vh_truth[7]]
|
||||
mock_path = "certbot_apache.display_ops.select_vhost_multiple"
|
||||
with mock.patch(mock_path) as mock_select_vhs:
|
||||
mock_select_vhs.return_value = [self.vh_truth[7]]
|
||||
vhs = self.config._choose_vhosts_wildcard("whatever",
|
||||
create_ssl=True)
|
||||
self.assertEquals(mock_select_vhs.call_args[0][0][0], self.vh_truth[7])
|
||||
self.assertEquals(len(mock_select_vhs.call_args_list), 1)
|
||||
# Ensure that make_vhost_ssl was not called, vhost.ssl == true
|
||||
self.assertFalse(mock_makessl.called)
|
||||
|
||||
# And the actual returned values
|
||||
self.assertEquals(len(vhs), 1)
|
||||
self.assertTrue(vhs[0].ssl)
|
||||
self.assertEquals(vhs[0], self.vh_truth[7])
|
||||
|
||||
|
||||
def test_deploy_cert_wildcard(self):
|
||||
# pylint: disable=protected-access
|
||||
mock_choose_vhosts = mock.MagicMock()
|
||||
mock_choose_vhosts.return_value = [self.vh_truth[7]]
|
||||
self.config._choose_vhosts_wildcard = mock_choose_vhosts
|
||||
mock_d = "certbot_apache.configurator.ApacheConfigurator._deploy_cert"
|
||||
with mock.patch(mock_d) as mock_dep:
|
||||
self.config.deploy_cert("*.wildcard.example.org", "/tmp/path",
|
||||
"/tmp/path", "/tmp/path", "/tmp/path")
|
||||
self.assertTrue(mock_dep.called)
|
||||
self.assertEquals(len(mock_dep.call_args_list), 1)
|
||||
self.assertEqual(self.vh_truth[7], mock_dep.call_args_list[0][0][0])
|
||||
|
||||
@mock.patch("certbot_apache.display_ops.select_vhost_multiple")
|
||||
def test_deploy_cert_wildcard_no_vhosts(self, mock_dialog):
|
||||
# pylint: disable=protected-access
|
||||
mock_dialog.return_value = []
|
||||
self.assertRaises(errors.PluginError,
|
||||
self.config.deploy_cert,
|
||||
"*.wild.cat", "/tmp/path", "/tmp/path",
|
||||
"/tmp/path", "/tmp/path")
|
||||
|
||||
@mock.patch("certbot_apache.configurator.ApacheConfigurator._choose_vhosts_wildcard")
|
||||
def test_enhance_wildcard_after_install(self, mock_choose):
|
||||
# pylint: disable=protected-access
|
||||
self.config.parser.modules.add("mod_ssl.c")
|
||||
self.config.parser.modules.add("headers_module")
|
||||
self.config._wildcard_vhosts["*.certbot.demo"] = [self.vh_truth[3]]
|
||||
self.config.enhance("*.certbot.demo", "ensure-http-header",
|
||||
"Upgrade-Insecure-Requests")
|
||||
self.assertFalse(mock_choose.called)
|
||||
|
||||
@mock.patch("certbot_apache.configurator.ApacheConfigurator._choose_vhosts_wildcard")
|
||||
def test_enhance_wildcard_no_install(self, mock_choose):
|
||||
mock_choose.return_value = [self.vh_truth[3]]
|
||||
self.config.parser.modules.add("mod_ssl.c")
|
||||
self.config.parser.modules.add("headers_module")
|
||||
self.config.enhance("*.certbot.demo", "ensure-http-header",
|
||||
"Upgrade-Insecure-Requests")
|
||||
self.assertTrue(mock_choose.called)
|
||||
|
||||
|
||||
class AugeasVhostsTest(util.ApacheTest):
|
||||
"""Test vhosts with illegal names dependent on augeas version."""
|
||||
# pylint: disable=protected-access
|
||||
|
||||
@@ -11,9 +11,39 @@ from certbot.tests import util as certbot_util
|
||||
|
||||
from certbot_apache import obj
|
||||
|
||||
from certbot_apache.display_ops import select_vhost_multiple
|
||||
from certbot_apache.tests import util
|
||||
|
||||
|
||||
class SelectVhostMultiTest(unittest.TestCase):
|
||||
"""Tests for certbot_apache.display_ops.select_vhost_multiple."""
|
||||
|
||||
def setUp(self):
|
||||
self.base_dir = "/example_path"
|
||||
self.vhosts = util.get_vh_truth(
|
||||
self.base_dir, "debian_apache_2_4/multiple_vhosts")
|
||||
|
||||
def test_select_no_input(self):
|
||||
self.assertFalse(select_vhost_multiple([]))
|
||||
|
||||
@certbot_util.patch_get_utility()
|
||||
def test_select_correct(self, mock_util):
|
||||
mock_util().checklist.return_value = (
|
||||
display_util.OK, [self.vhosts[3].display_repr(),
|
||||
self.vhosts[2].display_repr()])
|
||||
vhs = select_vhost_multiple([self.vhosts[3],
|
||||
self.vhosts[2],
|
||||
self.vhosts[1]])
|
||||
self.assertTrue(self.vhosts[2] in vhs)
|
||||
self.assertTrue(self.vhosts[3] in vhs)
|
||||
self.assertFalse(self.vhosts[1] in vhs)
|
||||
|
||||
@certbot_util.patch_get_utility()
|
||||
def test_select_cancel(self, mock_util):
|
||||
mock_util().checklist.return_value = (display_util.CANCEL, "whatever")
|
||||
vhs = select_vhost_multiple([self.vhosts[2], self.vhosts[3]])
|
||||
self.assertFalse(vhs)
|
||||
|
||||
class SelectVhostTest(unittest.TestCase):
|
||||
"""Tests for certbot_apache.display_ops.select_vhost."""
|
||||
|
||||
|
||||
@@ -940,7 +940,6 @@ class MainTest(test_util.ConfigTestCase): # pylint: disable=too-many-public-met
|
||||
self.assertRaises(errors.ConfigurationError,
|
||||
self._call,
|
||||
['-d', (('a' * 50) + '.') * 10])
|
||||
|
||||
# Bare IP address (this is actually a different error message now)
|
||||
self.assertRaises(errors.ConfigurationError,
|
||||
self._call,
|
||||
|
||||
Reference in New Issue
Block a user