Compare commits
2 Commits
test-apach
...
legacy_ass
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5b45d0742a | ||
|
|
09b6245817 |
@@ -30,8 +30,10 @@ from certbot.plugins.util import path_surgery
|
||||
from certbot.plugins.enhancements import AutoHSTSEnhancement
|
||||
|
||||
from certbot_apache import apache_util
|
||||
from certbot_apache import assertions
|
||||
from certbot_apache import constants
|
||||
from certbot_apache import display_ops
|
||||
from certbot_apache import dualparser
|
||||
from certbot_apache import http_01
|
||||
from certbot_apache import obj
|
||||
from certbot_apache import parser
|
||||
@@ -204,8 +206,10 @@ class ApacheConfigurator(common.Installer):
|
||||
# These will be set in the prepare function
|
||||
self._prepared = False
|
||||
self.parser = None
|
||||
self.parser_root = None
|
||||
self.version = version
|
||||
self.vhosts = None
|
||||
self.parsernode_vhosts = None
|
||||
self.options = copy.deepcopy(self.OS_DEFAULTS)
|
||||
self._enhance_func = {"redirect": self._enable_redirect,
|
||||
"ensure-http-header": self._set_http_header,
|
||||
@@ -253,11 +257,16 @@ class ApacheConfigurator(common.Installer):
|
||||
# Perform the actual Augeas initialization to be able to react
|
||||
self.parser = self.get_parser()
|
||||
|
||||
# parserv2
|
||||
# initialize ParserNode root
|
||||
self.parser_root = self.get_parsernode_root()
|
||||
|
||||
# Check for errors in parsing files with Augeas
|
||||
self.parser.check_parsing_errors("httpd.aug")
|
||||
|
||||
# Get all of the available vhosts
|
||||
self.vhosts = self.get_virtual_hosts()
|
||||
self.parsernode_vhosts = self.get_virtual_hosts_v2()
|
||||
|
||||
self.install_ssl_options_conf(self.mod_ssl_conf,
|
||||
self.updated_mod_ssl_conf_digest)
|
||||
@@ -348,6 +357,14 @@ class ApacheConfigurator(common.Installer):
|
||||
self.option("server_root"), self.conf("vhost-root"),
|
||||
self.version, configurator=self)
|
||||
|
||||
def get_parsernode_root(self):
|
||||
"""Initializes the ParserNode parser root instance."""
|
||||
return dualparser.DualBlockNode(
|
||||
name=assertions.PASS,
|
||||
ancestor=None,
|
||||
filepath=assertions.PASS
|
||||
)
|
||||
|
||||
def _wildcard_domain(self, domain):
|
||||
"""
|
||||
Checks if domain is a wildcard domain
|
||||
@@ -924,6 +941,89 @@ class ApacheConfigurator(common.Installer):
|
||||
vhs.append(new_vhost)
|
||||
return vhs
|
||||
|
||||
def get_virtual_hosts_v2(self):
|
||||
"""Returns list of virtual hosts found in the Apache configuration using
|
||||
ParserNode interface.
|
||||
|
||||
:returns: List of :class:`~certbot_apache.obj.VirtualHost`
|
||||
objects found in configuration
|
||||
:rtype: list
|
||||
|
||||
"""
|
||||
vhs = []
|
||||
vhosts = self.parser_root.find_blocks("VirtualHost")
|
||||
for vhblock in vhosts:
|
||||
vhs.append(self._create_vhost_v2(vhblock))
|
||||
return vhs
|
||||
|
||||
def _create_vhost_v2(self, node):
|
||||
"""Used by get_virtual_hosts_v2 to create vhost objects using ParserNode
|
||||
interfaces.
|
||||
|
||||
:param interfaces.BlockNode node: The BlockNode object of VirtualHost block
|
||||
:returns: newly created vhost
|
||||
:rtype: :class:`~certbot_apache.obj.VirtualHost`
|
||||
"""
|
||||
addrs = set()
|
||||
for param in node.parameters:
|
||||
addrs.add(obj.Addr.fromstring(param))
|
||||
|
||||
is_ssl = False
|
||||
sslengine = node.find_directives("SSLEngine")
|
||||
if sslengine:
|
||||
for directive in sslengine:
|
||||
# TODO: apache-parser-v2
|
||||
# This search should be made wiser. (using other identificators)
|
||||
try:
|
||||
if directive.parameters[0].lower() == "on":
|
||||
is_ssl = True
|
||||
except IndexError:
|
||||
pass
|
||||
|
||||
# "SSLEngine on" might be set outside of <VirtualHost>
|
||||
# Treat vhosts with port 443 as ssl vhosts
|
||||
for addr in addrs:
|
||||
if addr.get_port() == "443":
|
||||
is_ssl = True
|
||||
|
||||
macro = False
|
||||
if node.find_directives("Macro"):
|
||||
macro = True
|
||||
|
||||
vhost_enabled = self.parser.parsed_in_original(node.filepath)
|
||||
|
||||
vhost = obj.VirtualHost(node.filepath, None,
|
||||
addrs, is_ssl, vhost_enabled, modmacro=macro,
|
||||
node=node)
|
||||
|
||||
self._populate_vhost_names_v2(vhost)
|
||||
return vhost
|
||||
|
||||
def _populate_vhost_names_v2(self, vhost):
|
||||
"""Helper function that populates the VirtualHost names.
|
||||
:param host: In progress vhost whose names will be added
|
||||
:type host: :class:`~certbot_apache.obj.VirtualHost`
|
||||
"""
|
||||
|
||||
servername_match = vhost.node.find_directives("ServerName",
|
||||
exclude=False)
|
||||
serveralias_match = vhost.node.find_directives("ServerAlias",
|
||||
exclude=False)
|
||||
|
||||
if servername_match:
|
||||
try:
|
||||
servername = servername_match[-1].parameters[-1]
|
||||
except IndexError:
|
||||
# ServerName directive was found, but didn't contain parameters
|
||||
pass
|
||||
|
||||
if not vhost.modmacro:
|
||||
for alias in serveralias_match:
|
||||
for serveralias in alias.parameters:
|
||||
vhost.aliases.add(serveralias)
|
||||
vhost.name = servername
|
||||
|
||||
|
||||
def is_name_vhost(self, target_addr):
|
||||
"""Returns if vhost is a name based vhost
|
||||
|
||||
|
||||
@@ -124,7 +124,7 @@ class VirtualHost(object): # pylint: disable=too-few-public-methods
|
||||
strip_name = re.compile(r"^(?:.+://)?([^ :$]*)")
|
||||
|
||||
def __init__(self, filep, path, addrs, ssl, enabled, name=None,
|
||||
aliases=None, modmacro=False, ancestor=None):
|
||||
aliases=None, modmacro=False, ancestor=None, node=None):
|
||||
|
||||
# pylint: disable=too-many-arguments
|
||||
"""Initialize a VH."""
|
||||
@@ -137,6 +137,7 @@ class VirtualHost(object): # pylint: disable=too-few-public-methods
|
||||
self.enabled = enabled
|
||||
self.modmacro = modmacro
|
||||
self.ancestor = ancestor
|
||||
self.node = node
|
||||
|
||||
def get_names(self):
|
||||
"""Return a set of all names."""
|
||||
|
||||
@@ -2,9 +2,16 @@
|
||||
|
||||
import unittest
|
||||
|
||||
import mock
|
||||
|
||||
from acme.magic_typing import Dict, List, Optional # pylint: disable=unused-import, no-name-in-module
|
||||
|
||||
from certbot_apache import dualparser
|
||||
from certbot_apache import interfaces
|
||||
from certbot_apache import parsernode_util as util
|
||||
|
||||
from certbot_apache.tests import util as testutil
|
||||
|
||||
|
||||
class DummyParserNode(interfaces.ParserNode):
|
||||
""" A dummy class implementing ParserNode interface """
|
||||
@@ -97,7 +104,7 @@ interfaces.CommentNode.register(DummyCommentNode)
|
||||
interfaces.DirectiveNode.register(DummyDirectiveNode)
|
||||
interfaces.BlockNode.register(DummyBlockNode)
|
||||
|
||||
class ParserNodeTest(unittest.TestCase):
|
||||
class DummyParserNodeTest(unittest.TestCase):
|
||||
"""Dummy placeholder test case for ParserNode interfaces"""
|
||||
|
||||
def test_dummy(self):
|
||||
@@ -120,5 +127,140 @@ class ParserNodeTest(unittest.TestCase):
|
||||
)
|
||||
|
||||
|
||||
class ParserNodeTest(testutil.ApacheTest):
|
||||
"""Tests for ParserNode functionalities in ApacheConfigurator"""
|
||||
|
||||
def setUp(self): # pylint: disable=arguments-differ
|
||||
super(ParserNodeTest, self).setUp()
|
||||
|
||||
self.config = testutil.get_apache_configurator(
|
||||
self.config_path, self.vhost_path, self.config_dir, self.work_dir)
|
||||
self.vh_truth = testutil.get_vh_truth(
|
||||
self.temp_dir, "debian_apache_2_4/multiple_vhosts")
|
||||
|
||||
def _get_virtual_hosts_sideeffect(self,
|
||||
vhost_params=("*:80",),
|
||||
filepath="/tmp/something",
|
||||
servername=("pnode.example.org",),
|
||||
serveralias=("pnode2.example.org",),
|
||||
ssl=False,
|
||||
macro=False):
|
||||
""" Gets the mock.side_effect chain for get_virtual_hosts.
|
||||
|
||||
The calls for find_directives occur in order of:
|
||||
- SSLEngine
|
||||
- Macro
|
||||
- ServerName
|
||||
- ServerAlias
|
||||
"""
|
||||
# pylint: disable=too-many-arguments
|
||||
|
||||
ret = dict() # type: Dict[str, List[List[Optional[dualparser.DualNodeBase]]]]
|
||||
ret["blocks"] = []
|
||||
ret["dirs"] = []
|
||||
|
||||
vh_block = DummyBlockNode(
|
||||
name="VirtualHost",
|
||||
parameters=vhost_params,
|
||||
ancestor=self.config.parser_root,
|
||||
filepath=filepath
|
||||
)
|
||||
|
||||
ret["blocks"].append(
|
||||
[dualparser.DualBlockNode(primary=vh_block, secondary=vh_block)]
|
||||
)
|
||||
|
||||
if ssl:
|
||||
ssl_dir = DummyDirectiveNode(
|
||||
name="SSLEngine",
|
||||
parameters=("on",),
|
||||
ancestor=ret["blocks"],
|
||||
filepath=filepath
|
||||
)
|
||||
ret["dirs"].append(
|
||||
[dualparser.DualDirectiveNode(primary=ssl_dir, secondary=ssl_dir)]
|
||||
)
|
||||
else:
|
||||
ret["dirs"].append([])
|
||||
|
||||
if macro:
|
||||
m_dir = DummyDirectiveNode(
|
||||
name="Macro",
|
||||
parameters=("on",),
|
||||
ancestor=ret["blocks"],
|
||||
filepath=filepath
|
||||
)
|
||||
ret["dirs"].append(
|
||||
[dualparser.DualDirectiveNode(primary=m_dir, secondary=m_dir)]
|
||||
)
|
||||
else:
|
||||
ret["dirs"].append([])
|
||||
|
||||
if servername is not None:
|
||||
sn_dir = DummyDirectiveNode(
|
||||
name="ServerName",
|
||||
parameters=servername,
|
||||
ancestor=ret["blocks"],
|
||||
filepath=filepath
|
||||
)
|
||||
ret["dirs"].append(
|
||||
[dualparser.DualDirectiveNode(primary=sn_dir, secondary=sn_dir)]
|
||||
)
|
||||
else: # pragma: no cover
|
||||
ret["dirs"].append([])
|
||||
|
||||
if serveralias is not None:
|
||||
sa_dir = DummyDirectiveNode(
|
||||
name="ServerAlias",
|
||||
parameters=serveralias,
|
||||
ancestor=ret["blocks"],
|
||||
filepath=filepath
|
||||
)
|
||||
ret["dirs"].append(
|
||||
[dualparser.DualDirectiveNode(primary=sa_dir, secondary=sa_dir)]
|
||||
)
|
||||
else: # pragma: no cover
|
||||
ret["dirs"].append([])
|
||||
|
||||
return ret
|
||||
|
||||
def _call_get_vhosts(self, side_effects):
|
||||
dirs = "certbot_apache.dualparser.DualBlockNode.find_directives"
|
||||
blks = "certbot_apache.dualparser.DualBlockNode.find_blocks"
|
||||
with mock.patch(dirs) as mock_dirs:
|
||||
mock_dirs.side_effect = side_effects["dirs"]
|
||||
with mock.patch(blks) as mock_blocks:
|
||||
mock_blocks.side_effect = side_effects["blocks"]
|
||||
return self.config.get_virtual_hosts_v2()
|
||||
|
||||
def test_get_virtual_hosts(self):
|
||||
side_effects = self._get_virtual_hosts_sideeffect()
|
||||
vhosts = self._call_get_vhosts(side_effects)
|
||||
self.assertEqual(vhosts[0].name, "pnode.example.org")
|
||||
self.assertTrue("pnode2.example.org" in vhosts[0].aliases)
|
||||
self.assertEqual(len(vhosts[0].aliases), 1)
|
||||
self.assertFalse(vhosts[0].ssl)
|
||||
self.assertFalse(vhosts[0].modmacro)
|
||||
self.assertEqual(vhosts[0].filep, "/tmp/something")
|
||||
|
||||
def test_get_virtual_hosts_ssl_by_port(self):
|
||||
side_effects = self._get_virtual_hosts_sideeffect(
|
||||
vhost_params=("*:443",))
|
||||
vhosts = self._call_get_vhosts(side_effects)
|
||||
self.assertTrue(vhosts[0].ssl)
|
||||
|
||||
def test_get_virtual_hosts_ssl_by_sslengine(self):
|
||||
side_effects = self._get_virtual_hosts_sideeffect(
|
||||
ssl=True)
|
||||
vhosts = self._call_get_vhosts(side_effects)
|
||||
self.assertTrue(vhosts[0].ssl)
|
||||
|
||||
def test_get_virtual_hosts_modmacro(self):
|
||||
side_effects = self._get_virtual_hosts_sideeffect(
|
||||
macro=True)
|
||||
vhosts = self._call_get_vhosts(side_effects)
|
||||
self.assertTrue(vhosts[0].modmacro)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main() # pragma: no cover
|
||||
|
||||
Reference in New Issue
Block a user