Compare commits

...

2 Commits

Author SHA1 Message Date
Joona Hoikkala
5b45d0742a Merge remote-tracking branch 'origin/apache-parser-v2' into legacy_assert_1 2019-09-26 15:15:34 +03:00
Joona Hoikkala
09b6245817 Implement get_virtual_hosts() for ParserNode 2019-09-25 20:31:17 +03:00
3 changed files with 245 additions and 2 deletions

View File

@@ -30,8 +30,10 @@ from certbot.plugins.util import path_surgery
from certbot.plugins.enhancements import AutoHSTSEnhancement
from certbot_apache import apache_util
from certbot_apache import assertions
from certbot_apache import constants
from certbot_apache import display_ops
from certbot_apache import dualparser
from certbot_apache import http_01
from certbot_apache import obj
from certbot_apache import parser
@@ -204,8 +206,10 @@ class ApacheConfigurator(common.Installer):
# These will be set in the prepare function
self._prepared = False
self.parser = None
self.parser_root = None
self.version = version
self.vhosts = None
self.parsernode_vhosts = None
self.options = copy.deepcopy(self.OS_DEFAULTS)
self._enhance_func = {"redirect": self._enable_redirect,
"ensure-http-header": self._set_http_header,
@@ -253,11 +257,16 @@ class ApacheConfigurator(common.Installer):
# Perform the actual Augeas initialization to be able to react
self.parser = self.get_parser()
# parserv2
# initialize ParserNode root
self.parser_root = self.get_parsernode_root()
# Check for errors in parsing files with Augeas
self.parser.check_parsing_errors("httpd.aug")
# Get all of the available vhosts
self.vhosts = self.get_virtual_hosts()
self.parsernode_vhosts = self.get_virtual_hosts_v2()
self.install_ssl_options_conf(self.mod_ssl_conf,
self.updated_mod_ssl_conf_digest)
@@ -348,6 +357,14 @@ class ApacheConfigurator(common.Installer):
self.option("server_root"), self.conf("vhost-root"),
self.version, configurator=self)
def get_parsernode_root(self):
"""Initializes the ParserNode parser root instance."""
return dualparser.DualBlockNode(
name=assertions.PASS,
ancestor=None,
filepath=assertions.PASS
)
def _wildcard_domain(self, domain):
"""
Checks if domain is a wildcard domain
@@ -924,6 +941,89 @@ class ApacheConfigurator(common.Installer):
vhs.append(new_vhost)
return vhs
def get_virtual_hosts_v2(self):
"""Returns list of virtual hosts found in the Apache configuration using
ParserNode interface.
:returns: List of :class:`~certbot_apache.obj.VirtualHost`
objects found in configuration
:rtype: list
"""
vhs = []
vhosts = self.parser_root.find_blocks("VirtualHost")
for vhblock in vhosts:
vhs.append(self._create_vhost_v2(vhblock))
return vhs
def _create_vhost_v2(self, node):
"""Used by get_virtual_hosts_v2 to create vhost objects using ParserNode
interfaces.
:param interfaces.BlockNode node: The BlockNode object of VirtualHost block
:returns: newly created vhost
:rtype: :class:`~certbot_apache.obj.VirtualHost`
"""
addrs = set()
for param in node.parameters:
addrs.add(obj.Addr.fromstring(param))
is_ssl = False
sslengine = node.find_directives("SSLEngine")
if sslengine:
for directive in sslengine:
# TODO: apache-parser-v2
# This search should be made wiser. (using other identificators)
try:
if directive.parameters[0].lower() == "on":
is_ssl = True
except IndexError:
pass
# "SSLEngine on" might be set outside of <VirtualHost>
# Treat vhosts with port 443 as ssl vhosts
for addr in addrs:
if addr.get_port() == "443":
is_ssl = True
macro = False
if node.find_directives("Macro"):
macro = True
vhost_enabled = self.parser.parsed_in_original(node.filepath)
vhost = obj.VirtualHost(node.filepath, None,
addrs, is_ssl, vhost_enabled, modmacro=macro,
node=node)
self._populate_vhost_names_v2(vhost)
return vhost
def _populate_vhost_names_v2(self, vhost):
"""Helper function that populates the VirtualHost names.
:param host: In progress vhost whose names will be added
:type host: :class:`~certbot_apache.obj.VirtualHost`
"""
servername_match = vhost.node.find_directives("ServerName",
exclude=False)
serveralias_match = vhost.node.find_directives("ServerAlias",
exclude=False)
if servername_match:
try:
servername = servername_match[-1].parameters[-1]
except IndexError:
# ServerName directive was found, but didn't contain parameters
pass
if not vhost.modmacro:
for alias in serveralias_match:
for serveralias in alias.parameters:
vhost.aliases.add(serveralias)
vhost.name = servername
def is_name_vhost(self, target_addr):
"""Returns if vhost is a name based vhost

View File

@@ -124,7 +124,7 @@ class VirtualHost(object): # pylint: disable=too-few-public-methods
strip_name = re.compile(r"^(?:.+://)?([^ :$]*)")
def __init__(self, filep, path, addrs, ssl, enabled, name=None,
aliases=None, modmacro=False, ancestor=None):
aliases=None, modmacro=False, ancestor=None, node=None):
# pylint: disable=too-many-arguments
"""Initialize a VH."""
@@ -137,6 +137,7 @@ class VirtualHost(object): # pylint: disable=too-few-public-methods
self.enabled = enabled
self.modmacro = modmacro
self.ancestor = ancestor
self.node = node
def get_names(self):
"""Return a set of all names."""

View File

@@ -2,9 +2,16 @@
import unittest
import mock
from acme.magic_typing import Dict, List, Optional # pylint: disable=unused-import, no-name-in-module
from certbot_apache import dualparser
from certbot_apache import interfaces
from certbot_apache import parsernode_util as util
from certbot_apache.tests import util as testutil
class DummyParserNode(interfaces.ParserNode):
""" A dummy class implementing ParserNode interface """
@@ -97,7 +104,7 @@ interfaces.CommentNode.register(DummyCommentNode)
interfaces.DirectiveNode.register(DummyDirectiveNode)
interfaces.BlockNode.register(DummyBlockNode)
class ParserNodeTest(unittest.TestCase):
class DummyParserNodeTest(unittest.TestCase):
"""Dummy placeholder test case for ParserNode interfaces"""
def test_dummy(self):
@@ -120,5 +127,140 @@ class ParserNodeTest(unittest.TestCase):
)
class ParserNodeTest(testutil.ApacheTest):
"""Tests for ParserNode functionalities in ApacheConfigurator"""
def setUp(self): # pylint: disable=arguments-differ
super(ParserNodeTest, self).setUp()
self.config = testutil.get_apache_configurator(
self.config_path, self.vhost_path, self.config_dir, self.work_dir)
self.vh_truth = testutil.get_vh_truth(
self.temp_dir, "debian_apache_2_4/multiple_vhosts")
def _get_virtual_hosts_sideeffect(self,
vhost_params=("*:80",),
filepath="/tmp/something",
servername=("pnode.example.org",),
serveralias=("pnode2.example.org",),
ssl=False,
macro=False):
""" Gets the mock.side_effect chain for get_virtual_hosts.
The calls for find_directives occur in order of:
- SSLEngine
- Macro
- ServerName
- ServerAlias
"""
# pylint: disable=too-many-arguments
ret = dict() # type: Dict[str, List[List[Optional[dualparser.DualNodeBase]]]]
ret["blocks"] = []
ret["dirs"] = []
vh_block = DummyBlockNode(
name="VirtualHost",
parameters=vhost_params,
ancestor=self.config.parser_root,
filepath=filepath
)
ret["blocks"].append(
[dualparser.DualBlockNode(primary=vh_block, secondary=vh_block)]
)
if ssl:
ssl_dir = DummyDirectiveNode(
name="SSLEngine",
parameters=("on",),
ancestor=ret["blocks"],
filepath=filepath
)
ret["dirs"].append(
[dualparser.DualDirectiveNode(primary=ssl_dir, secondary=ssl_dir)]
)
else:
ret["dirs"].append([])
if macro:
m_dir = DummyDirectiveNode(
name="Macro",
parameters=("on",),
ancestor=ret["blocks"],
filepath=filepath
)
ret["dirs"].append(
[dualparser.DualDirectiveNode(primary=m_dir, secondary=m_dir)]
)
else:
ret["dirs"].append([])
if servername is not None:
sn_dir = DummyDirectiveNode(
name="ServerName",
parameters=servername,
ancestor=ret["blocks"],
filepath=filepath
)
ret["dirs"].append(
[dualparser.DualDirectiveNode(primary=sn_dir, secondary=sn_dir)]
)
else: # pragma: no cover
ret["dirs"].append([])
if serveralias is not None:
sa_dir = DummyDirectiveNode(
name="ServerAlias",
parameters=serveralias,
ancestor=ret["blocks"],
filepath=filepath
)
ret["dirs"].append(
[dualparser.DualDirectiveNode(primary=sa_dir, secondary=sa_dir)]
)
else: # pragma: no cover
ret["dirs"].append([])
return ret
def _call_get_vhosts(self, side_effects):
dirs = "certbot_apache.dualparser.DualBlockNode.find_directives"
blks = "certbot_apache.dualparser.DualBlockNode.find_blocks"
with mock.patch(dirs) as mock_dirs:
mock_dirs.side_effect = side_effects["dirs"]
with mock.patch(blks) as mock_blocks:
mock_blocks.side_effect = side_effects["blocks"]
return self.config.get_virtual_hosts_v2()
def test_get_virtual_hosts(self):
side_effects = self._get_virtual_hosts_sideeffect()
vhosts = self._call_get_vhosts(side_effects)
self.assertEqual(vhosts[0].name, "pnode.example.org")
self.assertTrue("pnode2.example.org" in vhosts[0].aliases)
self.assertEqual(len(vhosts[0].aliases), 1)
self.assertFalse(vhosts[0].ssl)
self.assertFalse(vhosts[0].modmacro)
self.assertEqual(vhosts[0].filep, "/tmp/something")
def test_get_virtual_hosts_ssl_by_port(self):
side_effects = self._get_virtual_hosts_sideeffect(
vhost_params=("*:443",))
vhosts = self._call_get_vhosts(side_effects)
self.assertTrue(vhosts[0].ssl)
def test_get_virtual_hosts_ssl_by_sslengine(self):
side_effects = self._get_virtual_hosts_sideeffect(
ssl=True)
vhosts = self._call_get_vhosts(side_effects)
self.assertTrue(vhosts[0].ssl)
def test_get_virtual_hosts_modmacro(self):
side_effects = self._get_virtual_hosts_sideeffect(
macro=True)
vhosts = self._call_get_vhosts(side_effects)
self.assertTrue(vhosts[0].modmacro)
if __name__ == "__main__":
unittest.main() # pragma: no cover