Compare commits
9 Commits
docs-insta
...
test-apach
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
57a8401369 | ||
|
|
f0f0b4db08 | ||
|
|
8840618c2d | ||
|
|
d1097d476f | ||
|
|
2ef0a702c9 | ||
|
|
f62eabc94e | ||
|
|
b1c5362929 | ||
|
|
2c496138bc | ||
|
|
f962a3c613 |
@@ -4,6 +4,13 @@ import fnmatch
|
||||
import logging
|
||||
import re
|
||||
import subprocess
|
||||
from typing import Any
|
||||
from typing import Dict
|
||||
from typing import Iterable
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from typing import Sequence
|
||||
from typing import Tuple
|
||||
|
||||
import pkg_resources
|
||||
|
||||
@@ -14,7 +21,7 @@ from certbot.compat import os
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def get_mod_deps(mod_name):
|
||||
def get_mod_deps(mod_name: str) -> Any:
|
||||
"""Get known module dependencies.
|
||||
|
||||
.. note:: This does not need to be accurate in order for the client to
|
||||
@@ -33,7 +40,7 @@ def get_mod_deps(mod_name):
|
||||
return deps.get(mod_name, [])
|
||||
|
||||
|
||||
def get_file_path(vhost_path):
|
||||
def get_file_path(vhost_path: str) -> Optional[str]:
|
||||
"""Get file path from augeas_vhost_path.
|
||||
|
||||
Takes in Augeas path and returns the file name
|
||||
@@ -50,7 +57,7 @@ def get_file_path(vhost_path):
|
||||
return _split_aug_path(vhost_path)[0]
|
||||
|
||||
|
||||
def get_internal_aug_path(vhost_path):
|
||||
def get_internal_aug_path(vhost_path: str) -> str:
|
||||
"""Get the Augeas path for a vhost with the file path removed.
|
||||
|
||||
:param str vhost_path: Augeas virtual host path
|
||||
@@ -62,7 +69,7 @@ def get_internal_aug_path(vhost_path):
|
||||
return _split_aug_path(vhost_path)[1]
|
||||
|
||||
|
||||
def _split_aug_path(vhost_path):
|
||||
def _split_aug_path(vhost_path: str) -> Tuple[str, str]:
|
||||
"""Splits an Augeas path into a file path and an internal path.
|
||||
|
||||
After removing "/files", this function splits vhost_path into the
|
||||
@@ -76,7 +83,7 @@ def _split_aug_path(vhost_path):
|
||||
"""
|
||||
# Strip off /files
|
||||
file_path = vhost_path[6:]
|
||||
internal_path = []
|
||||
internal_path: List[str] = []
|
||||
|
||||
# Remove components from the end of file_path until it becomes valid
|
||||
while not os.path.exists(file_path):
|
||||
@@ -86,7 +93,7 @@ def _split_aug_path(vhost_path):
|
||||
return file_path, "/".join(reversed(internal_path))
|
||||
|
||||
|
||||
def parse_define_file(filepath, varname):
|
||||
def parse_define_file(filepath: str, varname: str) -> Dict[str, str]:
|
||||
""" Parses Defines from a variable in configuration file
|
||||
|
||||
:param str filepath: Path of file to parse
|
||||
@@ -96,7 +103,7 @@ def parse_define_file(filepath, varname):
|
||||
:rtype: `dict`
|
||||
|
||||
"""
|
||||
return_vars = {}
|
||||
return_vars: Dict[str, str] = {}
|
||||
# Get list of words in the variable
|
||||
a_opts = util.get_var_from_file(varname, filepath).split()
|
||||
for i, v in enumerate(a_opts):
|
||||
@@ -111,19 +118,19 @@ def parse_define_file(filepath, varname):
|
||||
return return_vars
|
||||
|
||||
|
||||
def unique_id():
|
||||
def unique_id() -> str:
|
||||
""" Returns an unique id to be used as a VirtualHost identifier"""
|
||||
return binascii.hexlify(os.urandom(16)).decode("utf-8")
|
||||
|
||||
|
||||
def included_in_paths(filepath, paths):
|
||||
def included_in_paths(filepath: str, paths: Iterable[str]) -> bool:
|
||||
"""
|
||||
Returns true if the filepath is included in the list of paths
|
||||
that may contain full paths or wildcard paths that need to be
|
||||
expanded.
|
||||
|
||||
:param str filepath: Filepath to check
|
||||
:params list paths: List of paths to check against
|
||||
:param list paths: List of paths to check against
|
||||
|
||||
:returns: True if included
|
||||
:rtype: bool
|
||||
@@ -132,7 +139,7 @@ def included_in_paths(filepath, paths):
|
||||
return any(fnmatch.fnmatch(filepath, path) for path in paths)
|
||||
|
||||
|
||||
def parse_defines(apachectl):
|
||||
def parse_defines(apachectl: str) -> Dict[str, str]:
|
||||
"""
|
||||
Gets Defines from httpd process and returns a dictionary of
|
||||
the defined variables.
|
||||
@@ -143,7 +150,7 @@ def parse_defines(apachectl):
|
||||
:rtype: dict
|
||||
"""
|
||||
|
||||
variables = {}
|
||||
variables: Dict[str, str] = {}
|
||||
define_cmd = [apachectl, "-t", "-D",
|
||||
"DUMP_RUN_CFG"]
|
||||
matches = parse_from_subprocess(define_cmd, r"Define: ([^ \n]*)")
|
||||
@@ -161,7 +168,7 @@ def parse_defines(apachectl):
|
||||
return variables
|
||||
|
||||
|
||||
def parse_includes(apachectl):
|
||||
def parse_includes(apachectl: str) -> List[str]:
|
||||
"""
|
||||
Gets Include directives from httpd process and returns a list of
|
||||
their values.
|
||||
@@ -172,11 +179,11 @@ def parse_includes(apachectl):
|
||||
:rtype: list of str
|
||||
"""
|
||||
|
||||
inc_cmd = [apachectl, "-t", "-D", "DUMP_INCLUDES"]
|
||||
inc_cmd: List[str] = [apachectl, "-t", "-D", "DUMP_INCLUDES"]
|
||||
return parse_from_subprocess(inc_cmd, r"\(.*\) (.*)")
|
||||
|
||||
|
||||
def parse_modules(apachectl):
|
||||
def parse_modules(apachectl: str) -> List[str]:
|
||||
"""
|
||||
Get loaded modules from httpd process, and return the list
|
||||
of loaded module names.
|
||||
@@ -191,7 +198,7 @@ def parse_modules(apachectl):
|
||||
return parse_from_subprocess(mod_cmd, r"(.*)_module")
|
||||
|
||||
|
||||
def parse_from_subprocess(command, regexp):
|
||||
def parse_from_subprocess(command: List[str], regexp: str) -> List[str]:
|
||||
"""Get values from stdout of subprocess command
|
||||
|
||||
:param list command: Command to run
|
||||
@@ -205,7 +212,7 @@ def parse_from_subprocess(command, regexp):
|
||||
return re.compile(regexp).findall(stdout)
|
||||
|
||||
|
||||
def _get_runtime_cfg(command):
|
||||
def _get_runtime_cfg(command: Sequence[str]) -> str:
|
||||
"""
|
||||
Get runtime configuration info.
|
||||
|
||||
@@ -241,7 +248,7 @@ def _get_runtime_cfg(command):
|
||||
return stdout
|
||||
|
||||
|
||||
def find_ssl_apache_conf(prefix):
|
||||
def find_ssl_apache_conf(prefix: str) -> str:
|
||||
"""
|
||||
Find a TLS Apache config file in the dedicated storage.
|
||||
:param str prefix: prefix of the TLS Apache config file to find
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
""" apacheconfig implementation of the ParserNode interfaces """
|
||||
from typing import Any
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from typing import Tuple
|
||||
|
||||
from certbot_apache._internal import assertions
|
||||
@@ -13,20 +16,21 @@ class ApacheParserNode(interfaces.ParserNode):
|
||||
by parsing the equivalent configuration text using the apacheconfig library.
|
||||
"""
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
ancestor, dirty, filepath, metadata = util.parsernode_kwargs(
|
||||
kwargs) # pylint: disable=unused-variable
|
||||
def __init__(self, **kwargs: Any):
|
||||
# pylint: disable=unused-variable
|
||||
ancestor, dirty, filepath, metadata = util.parsernode_kwargs(kwargs)
|
||||
super().__init__(**kwargs)
|
||||
self.ancestor = ancestor
|
||||
self.filepath = filepath
|
||||
self.dirty = dirty
|
||||
self.metadata = metadata
|
||||
self._raw = self.metadata["ac_ast"]
|
||||
self.ancestor: str = ancestor
|
||||
self.filepath: str = filepath
|
||||
self.dirty: bool = dirty
|
||||
self.metadata: Any = metadata
|
||||
self._raw: Any = self.metadata["ac_ast"]
|
||||
|
||||
def save(self, msg): # pragma: no cover
|
||||
pass
|
||||
def save(self, msg: str) -> None:
|
||||
pass # pragma: no cover
|
||||
|
||||
def find_ancestors(self, name): # pylint: disable=unused-variable
|
||||
# pylint: disable=unused-variable
|
||||
def find_ancestors(self, name: str) -> List["ApacheBlockNode"]:
|
||||
"""Find ancestor BlockNodes with a given name"""
|
||||
return [ApacheBlockNode(name=assertions.PASS,
|
||||
parameters=assertions.PASS,
|
||||
@@ -38,33 +42,33 @@ class ApacheParserNode(interfaces.ParserNode):
|
||||
class ApacheCommentNode(ApacheParserNode):
|
||||
""" apacheconfig implementation of CommentNode interface """
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
def __init__(self, **kwargs: Any):
|
||||
comment, kwargs = util.commentnode_kwargs(kwargs) # pylint: disable=unused-variable
|
||||
super().__init__(**kwargs)
|
||||
self.comment = comment
|
||||
|
||||
def __eq__(self, other): # pragma: no cover
|
||||
def __eq__(self, other: Any):
|
||||
if isinstance(other, self.__class__):
|
||||
return (self.comment == other.comment and
|
||||
self.dirty == other.dirty and
|
||||
self.ancestor == other.ancestor and
|
||||
self.metadata == other.metadata and
|
||||
self.filepath == other.filepath)
|
||||
return False
|
||||
return False # pragma: no cover
|
||||
|
||||
|
||||
class ApacheDirectiveNode(ApacheParserNode):
|
||||
""" apacheconfig implementation of DirectiveNode interface """
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
def __init__(self, **kwargs: Any):
|
||||
name, parameters, enabled, kwargs = util.directivenode_kwargs(kwargs)
|
||||
super().__init__(**kwargs)
|
||||
self.name = name
|
||||
self.parameters = parameters
|
||||
self.enabled = enabled
|
||||
self.include = None
|
||||
self.name: str = name
|
||||
self.parameters: List[str] = parameters
|
||||
self.enabled: bool = enabled
|
||||
self.include: Optional[str] = None
|
||||
|
||||
def __eq__(self, other): # pragma: no cover
|
||||
def __eq__(self, other: Any) -> bool:
|
||||
if isinstance(other, self.__class__):
|
||||
return (self.name == other.name and
|
||||
self.filepath == other.filepath and
|
||||
@@ -73,21 +77,21 @@ class ApacheDirectiveNode(ApacheParserNode):
|
||||
self.dirty == other.dirty and
|
||||
self.ancestor == other.ancestor and
|
||||
self.metadata == other.metadata)
|
||||
return False
|
||||
return False # pragma: no cover
|
||||
|
||||
def set_parameters(self, _parameters): # pragma: no cover
|
||||
def set_parameters(self, _parameters):
|
||||
"""Sets the parameters for DirectiveNode"""
|
||||
return
|
||||
return # pragma: no cover
|
||||
|
||||
|
||||
class ApacheBlockNode(ApacheDirectiveNode):
|
||||
""" apacheconfig implementation of BlockNode interface """
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
def __init__(self, **kwargs: Any):
|
||||
super().__init__(**kwargs)
|
||||
self.children: Tuple[ApacheParserNode, ...] = ()
|
||||
|
||||
def __eq__(self, other): # pragma: no cover
|
||||
def __eq__(self, other):
|
||||
if isinstance(other, self.__class__):
|
||||
return (self.name == other.name and
|
||||
self.filepath == other.filepath and
|
||||
@@ -97,10 +101,12 @@ class ApacheBlockNode(ApacheDirectiveNode):
|
||||
self.dirty == other.dirty and
|
||||
self.ancestor == other.ancestor and
|
||||
self.metadata == other.metadata)
|
||||
return False
|
||||
return False # pragma: no cover
|
||||
|
||||
# pylint: disable=unused-argument
|
||||
def add_child_block(self, name, parameters=None, position=None): # pragma: no cover
|
||||
def add_child_block(
|
||||
self, name: str, parameters: Optional[str] = None, position: Optional[int] = None
|
||||
) -> "ApacheBlockNode": # pragma: no cover
|
||||
"""Adds a new BlockNode to the sequence of children"""
|
||||
new_block = ApacheBlockNode(name=assertions.PASS,
|
||||
parameters=assertions.PASS,
|
||||
@@ -111,7 +117,9 @@ class ApacheBlockNode(ApacheDirectiveNode):
|
||||
return new_block
|
||||
|
||||
# pylint: disable=unused-argument
|
||||
def add_child_directive(self, name, parameters=None, position=None): # pragma: no cover
|
||||
def add_child_directive(
|
||||
self, name: str, parameters: Optional[str] = None, position: Optional[int] = None
|
||||
) -> ApacheDirectiveNode: # pragma: no cover
|
||||
"""Adds a new DirectiveNode to the sequence of children"""
|
||||
new_dir = ApacheDirectiveNode(name=assertions.PASS,
|
||||
parameters=assertions.PASS,
|
||||
@@ -122,7 +130,9 @@ class ApacheBlockNode(ApacheDirectiveNode):
|
||||
return new_dir
|
||||
|
||||
# pylint: disable=unused-argument
|
||||
def add_child_comment(self, comment="", position=None): # pragma: no cover
|
||||
def add_child_comment(
|
||||
self, name: str, parameters: Optional[int] = None, position: Optional[int] = None
|
||||
) -> ApacheCommentNode: # pragma: no cover
|
||||
|
||||
"""Adds a new CommentNode to the sequence of children"""
|
||||
new_comment = ApacheCommentNode(comment=assertions.PASS,
|
||||
@@ -132,7 +142,8 @@ class ApacheBlockNode(ApacheDirectiveNode):
|
||||
self.children += (new_comment,)
|
||||
return new_comment
|
||||
|
||||
def find_blocks(self, name, exclude=True): # pylint: disable=unused-argument
|
||||
# pylint: disable=unused-argument
|
||||
def find_blocks(self, name, exclude: bool = True) -> List["ApacheBlockNode"]:
|
||||
"""Recursive search of BlockNodes from the sequence of children"""
|
||||
return [ApacheBlockNode(name=assertions.PASS,
|
||||
parameters=assertions.PASS,
|
||||
@@ -140,7 +151,8 @@ class ApacheBlockNode(ApacheDirectiveNode):
|
||||
filepath=assertions.PASS,
|
||||
metadata=self.metadata)]
|
||||
|
||||
def find_directives(self, name, exclude=True): # pylint: disable=unused-argument
|
||||
# pylint: disable=unused-argument
|
||||
def find_directives(self, name: str, exclude: bool = True) -> List[ApacheDirectiveNode]:
|
||||
"""Recursive search of DirectiveNodes from the sequence of children"""
|
||||
return [ApacheDirectiveNode(name=assertions.PASS,
|
||||
parameters=assertions.PASS,
|
||||
@@ -149,22 +161,22 @@ class ApacheBlockNode(ApacheDirectiveNode):
|
||||
metadata=self.metadata)]
|
||||
|
||||
# pylint: disable=unused-argument
|
||||
def find_comments(self, comment, exact=False): # pragma: no cover
|
||||
def find_comments(self, comment: str, exact: bool = False) -> List[ApacheCommentNode]:
|
||||
"""Recursive search of DirectiveNodes from the sequence of children"""
|
||||
return [ApacheCommentNode(comment=assertions.PASS,
|
||||
return [ApacheCommentNode(comment=assertions.PASS, # pragma: no cover
|
||||
ancestor=self,
|
||||
filepath=assertions.PASS,
|
||||
metadata=self.metadata)]
|
||||
|
||||
def delete_child(self, child): # pragma: no cover
|
||||
def delete_child(self, child: "ApacheBlockNode") -> None:
|
||||
"""Deletes a ParserNode from the sequence of children"""
|
||||
return
|
||||
return # pragma: no cover
|
||||
|
||||
def unsaved_files(self): # pragma: no cover
|
||||
def unsaved_files(self) -> List[str]:
|
||||
"""Returns a list of unsaved filepaths"""
|
||||
return [assertions.PASS]
|
||||
return [assertions.PASS] # pragma: no cover
|
||||
|
||||
def parsed_paths(self): # pragma: no cover
|
||||
def parsed_paths(self) -> List[str]:
|
||||
"""Returns a list of parsed configuration file paths"""
|
||||
return [assertions.PASS]
|
||||
|
||||
|
||||
@@ -1,12 +1,13 @@
|
||||
"""Dual parser node assertions"""
|
||||
import fnmatch
|
||||
from typing import Any
|
||||
|
||||
from certbot_apache._internal import interfaces
|
||||
|
||||
PASS = "CERTBOT_PASS_ASSERT"
|
||||
|
||||
|
||||
def assertEqual(first, second):
|
||||
def assertEqual(first: Any, second: Any) -> None:
|
||||
""" Equality assertion """
|
||||
|
||||
if isinstance(first, interfaces.CommentNode):
|
||||
@@ -30,7 +31,8 @@ def assertEqual(first, second):
|
||||
assert first.filepath == second.filepath
|
||||
|
||||
|
||||
def assertEqualComment(first, second): # pragma: no cover
|
||||
# pragma: no cover
|
||||
def assertEqualComment(first: Any, second: Any) -> None:
|
||||
""" Equality assertion for CommentNode """
|
||||
|
||||
assert isinstance(first, interfaces.CommentNode)
|
||||
@@ -40,7 +42,7 @@ def assertEqualComment(first, second): # pragma: no cover
|
||||
assert first.comment == second.comment # type: ignore
|
||||
|
||||
|
||||
def _assertEqualDirectiveComponents(first, second): # pragma: no cover
|
||||
def _assertEqualDirectiveComponents(first: Any, second: Any) -> None: # pragma: no cover
|
||||
""" Handles assertion for instance variables for DirectiveNode and BlockNode"""
|
||||
|
||||
# Enabled value cannot be asserted, because Augeas implementation
|
||||
@@ -53,7 +55,7 @@ def _assertEqualDirectiveComponents(first, second): # pragma: no cover
|
||||
assert first.parameters == second.parameters
|
||||
|
||||
|
||||
def assertEqualDirective(first, second):
|
||||
def assertEqualDirective(first: Any, second: Any) -> None:
|
||||
""" Equality assertion for DirectiveNode """
|
||||
|
||||
assert isinstance(first, interfaces.DirectiveNode)
|
||||
@@ -61,7 +63,7 @@ def assertEqualDirective(first, second):
|
||||
_assertEqualDirectiveComponents(first, second)
|
||||
|
||||
|
||||
def isPass(value): # pragma: no cover
|
||||
def isPass(value) -> bool: # pragma: no cover
|
||||
"""Checks if the value is set to PASS"""
|
||||
if isinstance(value, bool):
|
||||
return True
|
||||
@@ -73,9 +75,9 @@ def isPassDirective(block):
|
||||
|
||||
if isPass(block.name):
|
||||
return True
|
||||
if isPass(block.parameters): # pragma: no cover
|
||||
if isPass(block.parameters): # pragma: no cover
|
||||
return True
|
||||
if isPass(block.filepath): # pragma: no cover
|
||||
if isPass(block.filepath): # pragma: no cover
|
||||
return True
|
||||
return False
|
||||
|
||||
@@ -90,7 +92,7 @@ def isPassComment(comment):
|
||||
return False
|
||||
|
||||
|
||||
def isPassNodeList(nodelist): # pragma: no cover
|
||||
def isPassNodeList(nodelist): # pragma: no cover
|
||||
""" Checks if a ParserNode in the nodelist should pass the assertion,
|
||||
this function is used for results of find_* methods. Unimplemented find_*
|
||||
methods should return a sequence containing a single ParserNode instance
|
||||
@@ -115,7 +117,7 @@ def assertEqualSimple(first, second):
|
||||
assert first == second
|
||||
|
||||
|
||||
def isEqualVirtualHost(first, second):
|
||||
def isEqualVirtualHost(first, second) -> bool:
|
||||
"""
|
||||
Checks that two VirtualHost objects are similar. There are some built
|
||||
in differences with the implementations: VirtualHost created by ParserNode
|
||||
|
||||
@@ -64,7 +64,15 @@ Translates over to:
|
||||
"/files/etc/apache2/apache2.conf/bLoCk[1]",
|
||||
]
|
||||
"""
|
||||
from typing import Any
|
||||
from typing import cast
|
||||
from typing import Dict
|
||||
from typing import Iterable
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from typing import Set
|
||||
from typing import Tuple
|
||||
from typing import Union
|
||||
|
||||
from certbot import errors
|
||||
from certbot.compat import os
|
||||
@@ -78,14 +86,16 @@ from certbot_apache._internal import parsernode_util as util
|
||||
class AugeasParserNode(interfaces.ParserNode):
|
||||
""" Augeas implementation of ParserNode interface """
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
ancestor, dirty, filepath, metadata = util.parsernode_kwargs(kwargs) # pylint: disable=unused-variable
|
||||
def __init__(self, **kwargs: Any):
|
||||
# pylint: disable=unused-variable
|
||||
ancestor, dirty, filepath, metadata = util.parsernode_kwargs(kwargs)
|
||||
super().__init__(**kwargs)
|
||||
self.ancestor = ancestor
|
||||
self.filepath = filepath
|
||||
self.dirty = dirty
|
||||
self.metadata = metadata
|
||||
self.parser = self.metadata.get("augeasparser")
|
||||
self.ancestor: str = ancestor
|
||||
self.filepath: str = filepath
|
||||
self.dirty: bool = dirty
|
||||
self.metadata: Dict[str, Any] = metadata
|
||||
self.parser: parser.ApacheParser = cast(parser.ApacheParser,
|
||||
self.metadata.get("augeasparser"))
|
||||
try:
|
||||
if self.metadata["augeaspath"].endswith("/"):
|
||||
raise errors.PluginError(
|
||||
@@ -96,10 +106,10 @@ class AugeasParserNode(interfaces.ParserNode):
|
||||
except KeyError:
|
||||
raise errors.PluginError("Augeas path is required")
|
||||
|
||||
def save(self, msg):
|
||||
def save(self, msg: Iterable[str]) -> None:
|
||||
self.parser.save(msg)
|
||||
|
||||
def find_ancestors(self, name):
|
||||
def find_ancestors(self, name: str) -> List["AugeasBlockNode"]:
|
||||
"""
|
||||
Searches for ancestor BlockNodes with a given name.
|
||||
|
||||
@@ -109,7 +119,7 @@ class AugeasParserNode(interfaces.ParserNode):
|
||||
:rtype: list of AugeasBlockNode
|
||||
"""
|
||||
|
||||
ancestors = []
|
||||
ancestors: List[AugeasBlockNode] = []
|
||||
|
||||
parent = self.metadata["augeaspath"]
|
||||
while True:
|
||||
@@ -124,7 +134,7 @@ class AugeasParserNode(interfaces.ParserNode):
|
||||
|
||||
return ancestors
|
||||
|
||||
def _create_blocknode(self, path):
|
||||
def _create_blocknode(self, path: str) -> "AugeasBlockNode":
|
||||
"""
|
||||
Helper function to create a BlockNode from Augeas path. This is used by
|
||||
AugeasParserNode.find_ancestors and AugeasBlockNode.
|
||||
@@ -132,21 +142,25 @@ class AugeasParserNode(interfaces.ParserNode):
|
||||
|
||||
"""
|
||||
|
||||
name = self._aug_get_name(path)
|
||||
metadata = {"augeasparser": self.parser, "augeaspath": path}
|
||||
name: str = self._aug_get_name(path)
|
||||
metadata: Dict[str, Union[parser.ApacheParser, str]] = {
|
||||
"augeasparser": self.parser, "augeaspath": path
|
||||
}
|
||||
|
||||
# Check if the file was included from the root config or initial state
|
||||
enabled = self.parser.parsed_in_original(
|
||||
apache_util.get_file_path(path)
|
||||
)
|
||||
file_path = apache_util.get_file_path(path)
|
||||
if file_path is None:
|
||||
raise ValueError(f"No file path found for vhost: {path}.") # pragma: no cover
|
||||
|
||||
enabled = self.parser.parsed_in_original(file_path)
|
||||
|
||||
return AugeasBlockNode(name=name,
|
||||
enabled=enabled,
|
||||
ancestor=assertions.PASS,
|
||||
filepath=apache_util.get_file_path(path),
|
||||
filepath=file_path,
|
||||
metadata=metadata)
|
||||
|
||||
def _aug_get_name(self, path):
|
||||
def _aug_get_name(self, path: str) -> str:
|
||||
"""
|
||||
Helper function to get name of a configuration block or variable from path.
|
||||
"""
|
||||
@@ -166,12 +180,12 @@ class AugeasParserNode(interfaces.ParserNode):
|
||||
class AugeasCommentNode(AugeasParserNode):
|
||||
""" Augeas implementation of CommentNode interface """
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
def __init__(self, **kwargs: Any):
|
||||
comment, kwargs = util.commentnode_kwargs(kwargs) # pylint: disable=unused-variable
|
||||
super().__init__(**kwargs)
|
||||
self.comment = comment
|
||||
|
||||
def __eq__(self, other):
|
||||
def __eq__(self, other: Any) -> bool:
|
||||
if isinstance(other, self.__class__):
|
||||
return (self.comment == other.comment and
|
||||
self.filepath == other.filepath and
|
||||
@@ -184,15 +198,15 @@ class AugeasCommentNode(AugeasParserNode):
|
||||
class AugeasDirectiveNode(AugeasParserNode):
|
||||
""" Augeas implementation of DirectiveNode interface """
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
def __init__(self, **kwargs: Any):
|
||||
name, parameters, enabled, kwargs = util.directivenode_kwargs(kwargs)
|
||||
super().__init__(**kwargs)
|
||||
self.name = name
|
||||
self.enabled = enabled
|
||||
self.name: str = name
|
||||
self.enabled: bool = enabled
|
||||
if parameters:
|
||||
self.set_parameters(parameters)
|
||||
|
||||
def __eq__(self, other):
|
||||
def __eq__(self, other: Any) -> bool:
|
||||
if isinstance(other, self.__class__):
|
||||
return (self.name == other.name and
|
||||
self.filepath == other.filepath and
|
||||
@@ -203,7 +217,7 @@ class AugeasDirectiveNode(AugeasParserNode):
|
||||
self.metadata == other.metadata)
|
||||
return False
|
||||
|
||||
def set_parameters(self, parameters):
|
||||
def set_parameters(self, parameters: List[str]):
|
||||
"""
|
||||
Sets parameters of a DirectiveNode or BlockNode object.
|
||||
|
||||
@@ -222,7 +236,7 @@ class AugeasDirectiveNode(AugeasParserNode):
|
||||
self.parser.aug.set(param_path, param)
|
||||
|
||||
@property
|
||||
def parameters(self):
|
||||
def parameters(self) -> Tuple[Optional[str], ...]:
|
||||
"""
|
||||
Fetches the parameters from Augeas tree, ensuring that the sequence always
|
||||
represents the current state
|
||||
@@ -232,7 +246,7 @@ class AugeasDirectiveNode(AugeasParserNode):
|
||||
"""
|
||||
return tuple(self._aug_get_params(self.metadata["augeaspath"]))
|
||||
|
||||
def _aug_get_params(self, path):
|
||||
def _aug_get_params(self, path: str) -> List[Optional[str]]:
|
||||
"""Helper function to get parameters for DirectiveNodes and BlockNodes"""
|
||||
|
||||
arg_paths = self.parser.aug.match(path + "/arg")
|
||||
@@ -242,11 +256,11 @@ class AugeasDirectiveNode(AugeasParserNode):
|
||||
class AugeasBlockNode(AugeasDirectiveNode):
|
||||
""" Augeas implementation of BlockNode interface """
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
def __init__(self, **kwargs: Any):
|
||||
super().__init__(**kwargs)
|
||||
self.children = ()
|
||||
self.children: Tuple["AugeasBlockNode", ...] = ()
|
||||
|
||||
def __eq__(self, other):
|
||||
def __eq__(self, other: Any) -> bool:
|
||||
if isinstance(other, self.__class__):
|
||||
return (self.name == other.name and
|
||||
self.filepath == other.filepath and
|
||||
@@ -259,21 +273,24 @@ class AugeasBlockNode(AugeasDirectiveNode):
|
||||
return False
|
||||
|
||||
# pylint: disable=unused-argument
|
||||
def add_child_block(self, name, parameters=None, position=None): # pragma: no cover
|
||||
def add_child_block(
|
||||
self, name: str, parameters: Optional[str] = None, position: Optional[int] = None
|
||||
) -> "AugeasBlockNode": # pragma: no cover
|
||||
"""Adds a new BlockNode to the sequence of children"""
|
||||
|
||||
insertpath, realpath, before = self._aug_resolve_child_position(
|
||||
name,
|
||||
position
|
||||
)
|
||||
new_metadata = {"augeasparser": self.parser, "augeaspath": realpath}
|
||||
new_metadata: Dict[str, Any] = {"augeasparser": self.parser, "augeaspath": realpath}
|
||||
|
||||
# Create the new block
|
||||
self.parser.aug.insert(insertpath, name, before)
|
||||
# Check if the file was included from the root config or initial state
|
||||
enabled = self.parser.parsed_in_original(
|
||||
apache_util.get_file_path(realpath)
|
||||
)
|
||||
file_path = apache_util.get_file_path(realpath)
|
||||
if file_path is None:
|
||||
raise errors.Error(f"No file path found for vhost: {realpath}")
|
||||
enabled = self.parser.parsed_in_original(file_path)
|
||||
|
||||
# Parameters will be set at the initialization of the new object
|
||||
return AugeasBlockNode(
|
||||
@@ -281,12 +298,14 @@ class AugeasBlockNode(AugeasDirectiveNode):
|
||||
parameters=parameters,
|
||||
enabled=enabled,
|
||||
ancestor=assertions.PASS,
|
||||
filepath=apache_util.get_file_path(realpath),
|
||||
filepath=file_path,
|
||||
metadata=new_metadata,
|
||||
)
|
||||
|
||||
# pylint: disable=unused-argument
|
||||
def add_child_directive(self, name, parameters=None, position=None): # pragma: no cover
|
||||
def add_child_directive(
|
||||
self, name: str, parameters=None, position=None
|
||||
) -> "AugeasDirectiveNode": # pragma: no cover
|
||||
"""Adds a new DirectiveNode to the sequence of children"""
|
||||
|
||||
if not parameters:
|
||||
@@ -303,27 +322,32 @@ class AugeasBlockNode(AugeasDirectiveNode):
|
||||
# Set the directive key
|
||||
self.parser.aug.set(realpath, name)
|
||||
# Check if the file was included from the root config or initial state
|
||||
enabled = self.parser.parsed_in_original(
|
||||
apache_util.get_file_path(realpath)
|
||||
)
|
||||
file_path = apache_util.get_file_path(realpath)
|
||||
if file_path is None:
|
||||
raise errors.Error(f"No file path found for vhost: {realpath}")
|
||||
enabled = self.parser.parsed_in_original(file_path)
|
||||
|
||||
return AugeasDirectiveNode(
|
||||
name=name,
|
||||
parameters=parameters,
|
||||
enabled=enabled,
|
||||
ancestor=assertions.PASS,
|
||||
filepath=apache_util.get_file_path(realpath),
|
||||
filepath=file_path,
|
||||
metadata=new_metadata,
|
||||
)
|
||||
|
||||
def add_child_comment(self, comment="", position=None):
|
||||
def add_child_comment(
|
||||
self, comment: str = "", position: Optional[int] = None
|
||||
) -> "AugeasCommentNode":
|
||||
"""Adds a new CommentNode to the sequence of children"""
|
||||
|
||||
insertpath, realpath, before = self._aug_resolve_child_position(
|
||||
"#comment",
|
||||
position
|
||||
)
|
||||
new_metadata = {"augeasparser": self.parser, "augeaspath": realpath}
|
||||
new_metadata: Dict[str, Any] = {
|
||||
"augeasparser": self.parser, "augeaspath": realpath,
|
||||
}
|
||||
|
||||
# Create the new comment
|
||||
self.parser.aug.insert(insertpath, "#comment", before)
|
||||
@@ -337,11 +361,11 @@ class AugeasBlockNode(AugeasDirectiveNode):
|
||||
metadata=new_metadata,
|
||||
)
|
||||
|
||||
def find_blocks(self, name, exclude=True):
|
||||
def find_blocks(self, name: str, exclude: bool = True) -> List["AugeasBlockNode"]:
|
||||
"""Recursive search of BlockNodes from the sequence of children"""
|
||||
|
||||
nodes = []
|
||||
paths = self._aug_find_blocks(name)
|
||||
nodes: List["AugeasBlockNode"] = []
|
||||
paths: Iterable[str] = self._aug_find_blocks(name)
|
||||
if exclude:
|
||||
paths = self.parser.exclude_dirs(paths)
|
||||
for path in paths:
|
||||
@@ -349,7 +373,7 @@ class AugeasBlockNode(AugeasDirectiveNode):
|
||||
|
||||
return nodes
|
||||
|
||||
def find_directives(self, name, exclude=True):
|
||||
def find_directives(self, name: str, exclude: bool = True) -> List["AugeasDirectiveNode"]:
|
||||
"""Recursive search of DirectiveNodes from the sequence of children"""
|
||||
|
||||
nodes = []
|
||||
@@ -368,14 +392,14 @@ class AugeasBlockNode(AugeasDirectiveNode):
|
||||
|
||||
return nodes
|
||||
|
||||
def find_comments(self, comment):
|
||||
def find_comments(self, comment: str) -> List["AugeasCommentNode"]:
|
||||
"""
|
||||
Recursive search of DirectiveNodes from the sequence of children.
|
||||
|
||||
:param str comment: Comment content to search for.
|
||||
"""
|
||||
|
||||
nodes = []
|
||||
nodes: List["AugeasCommentNode"] = []
|
||||
ownpath = self.metadata.get("augeaspath")
|
||||
|
||||
comments = self.parser.find_comments(comment, start=ownpath)
|
||||
@@ -384,7 +408,7 @@ class AugeasBlockNode(AugeasDirectiveNode):
|
||||
|
||||
return nodes
|
||||
|
||||
def delete_child(self, child):
|
||||
def delete_child(self, child: "AugeasParserNode") -> None:
|
||||
"""
|
||||
Deletes a ParserNode from the sequence of children, and raises an
|
||||
exception if it's unable to do so.
|
||||
@@ -397,11 +421,11 @@ class AugeasBlockNode(AugeasDirectiveNode):
|
||||
"seem to exist.").format(child.metadata["augeaspath"])
|
||||
)
|
||||
|
||||
def unsaved_files(self):
|
||||
def unsaved_files(self) -> Set[str]:
|
||||
"""Returns a list of unsaved filepaths"""
|
||||
return self.parser.unsaved_files()
|
||||
|
||||
def parsed_paths(self):
|
||||
def parsed_paths(self) -> List[str]:
|
||||
"""
|
||||
Returns a list of file paths that have currently been parsed into the parser
|
||||
tree. The returned list may include paths with wildcard characters, for
|
||||
@@ -412,7 +436,7 @@ class AugeasBlockNode(AugeasDirectiveNode):
|
||||
:returns: list of file paths of files that have been parsed
|
||||
"""
|
||||
|
||||
res_paths = []
|
||||
res_paths: List[str] = []
|
||||
|
||||
paths = self.parser.existing_paths
|
||||
for directory in paths:
|
||||
@@ -421,7 +445,7 @@ class AugeasBlockNode(AugeasDirectiveNode):
|
||||
|
||||
return res_paths
|
||||
|
||||
def _create_commentnode(self, path):
|
||||
def _create_commentnode(self, path: str) -> "AugeasCommentNode":
|
||||
"""Helper function to create a CommentNode from Augeas path"""
|
||||
|
||||
comment = self.parser.aug.get(path)
|
||||
@@ -434,14 +458,16 @@ class AugeasBlockNode(AugeasDirectiveNode):
|
||||
filepath=apache_util.get_file_path(path),
|
||||
metadata=metadata)
|
||||
|
||||
def _create_directivenode(self, path):
|
||||
def _create_directivenode(self, path: str) -> "AugeasDirectiveNode":
|
||||
"""Helper function to create a DirectiveNode from Augeas path"""
|
||||
|
||||
name = self.parser.get_arg(path)
|
||||
metadata = {"augeasparser": self.parser, "augeaspath": path}
|
||||
metadata: Dict[str, Union[parser.ApacheParser, str]] = {
|
||||
"augeasparser": self.parser, "augeaspath": path,
|
||||
}
|
||||
|
||||
# Check if the file was included from the root config or initial state
|
||||
enabled = self.parser.parsed_in_original(
|
||||
enabled: bool = self.parser.parsed_in_original(
|
||||
apache_util.get_file_path(path)
|
||||
)
|
||||
return AugeasDirectiveNode(name=name,
|
||||
@@ -450,12 +476,12 @@ class AugeasBlockNode(AugeasDirectiveNode):
|
||||
filepath=apache_util.get_file_path(path),
|
||||
metadata=metadata)
|
||||
|
||||
def _aug_find_blocks(self, name):
|
||||
def _aug_find_blocks(self, name: str) -> Set[str]:
|
||||
"""Helper function to perform a search to Augeas DOM tree to search
|
||||
configuration blocks with a given name"""
|
||||
|
||||
# The code here is modified from configurator.get_virtual_hosts()
|
||||
blk_paths = set()
|
||||
blk_paths: Set[str] = set()
|
||||
for vhost_path in list(self.parser.parser_paths):
|
||||
paths = self.parser.aug.match(
|
||||
("/files%s//*[label()=~regexp('%s')]" %
|
||||
@@ -464,7 +490,8 @@ class AugeasBlockNode(AugeasDirectiveNode):
|
||||
name.lower() in os.path.basename(path).lower()])
|
||||
return blk_paths
|
||||
|
||||
def _aug_resolve_child_position(self, name, position):
|
||||
def _aug_resolve_child_position(
|
||||
self, name: str, position: Optional[int]) -> Tuple[str, str, bool]:
|
||||
"""
|
||||
Helper function that iterates through the immediate children and figures
|
||||
out the insertion path for a new AugeasParserNode.
|
||||
@@ -489,16 +516,16 @@ class AugeasBlockNode(AugeasDirectiveNode):
|
||||
"""
|
||||
|
||||
# Default to appending
|
||||
before = False
|
||||
before: bool = False
|
||||
|
||||
all_children = self.parser.aug.match("{}/*".format(
|
||||
all_children: str = self.parser.aug.match("{}/*".format(
|
||||
self.metadata["augeaspath"])
|
||||
)
|
||||
|
||||
# Calculate resulting_path
|
||||
# Augeas indices start at 1. We use counter to calculate the index to
|
||||
# be used in resulting_path.
|
||||
counter = 1
|
||||
counter: int = 1
|
||||
for i, child in enumerate(all_children):
|
||||
if position is not None and i >= position:
|
||||
# We're not going to insert the new node to an index after this
|
||||
@@ -507,7 +534,7 @@ class AugeasBlockNode(AugeasDirectiveNode):
|
||||
if name == childname:
|
||||
counter += 1
|
||||
|
||||
resulting_path = "{}/{}[{}]".format(
|
||||
resulting_path: str = "{}/{}[{}]".format(
|
||||
self.metadata["augeaspath"],
|
||||
name,
|
||||
counter
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,4 +1,6 @@
|
||||
"""Apache plugin constants."""
|
||||
from typing import List, Dict
|
||||
|
||||
import pkg_resources
|
||||
|
||||
from certbot.compat import os
|
||||
@@ -13,7 +15,7 @@ UPDATED_MOD_SSL_CONF_DIGEST = ".updated-options-ssl-apache-conf-digest.txt"
|
||||
in `certbot.configuration.NamespaceConfig.config_dir`."""
|
||||
|
||||
# NEVER REMOVE A SINGLE HASH FROM THIS LIST UNLESS YOU KNOW EXACTLY WHAT YOU ARE DOING!
|
||||
ALL_SSL_OPTIONS_HASHES = [
|
||||
ALL_SSL_OPTIONS_HASHES: List[str] = [
|
||||
'2086bca02db48daf93468332543c60ac6acdb6f0b58c7bfdf578a5d47092f82a',
|
||||
'4844d36c9a0f587172d9fa10f4f1c9518e3bcfa1947379f155e16a70a728c21a',
|
||||
'5a922826719981c0a234b1fbcd495f3213e49d2519e845ea0748ba513044b65b',
|
||||
@@ -36,39 +38,39 @@ AUGEAS_LENS_DIR = pkg_resources.resource_filename(
|
||||
"certbot_apache", os.path.join("_internal", "augeas_lens"))
|
||||
"""Path to the Augeas lens directory"""
|
||||
|
||||
REWRITE_HTTPS_ARGS = [
|
||||
REWRITE_HTTPS_ARGS: List[str] = [
|
||||
"^", "https://%{SERVER_NAME}%{REQUEST_URI}", "[L,NE,R=permanent]"]
|
||||
"""Apache version<2.3.9 rewrite rule arguments used for redirections to
|
||||
https vhost"""
|
||||
|
||||
REWRITE_HTTPS_ARGS_WITH_END = [
|
||||
REWRITE_HTTPS_ARGS_WITH_END: List[str] = [
|
||||
"^", "https://%{SERVER_NAME}%{REQUEST_URI}", "[END,NE,R=permanent]"]
|
||||
"""Apache version >= 2.3.9 rewrite rule arguments used for redirections to
|
||||
https vhost"""
|
||||
|
||||
OLD_REWRITE_HTTPS_ARGS = [
|
||||
OLD_REWRITE_HTTPS_ARGS: List[List[str]] = [
|
||||
["^", "https://%{SERVER_NAME}%{REQUEST_URI}", "[L,QSA,R=permanent]"],
|
||||
["^", "https://%{SERVER_NAME}%{REQUEST_URI}", "[END,QSA,R=permanent]"]]
|
||||
|
||||
HSTS_ARGS = ["always", "set", "Strict-Transport-Security",
|
||||
HSTS_ARGS: List[str] = ["always", "set", "Strict-Transport-Security",
|
||||
"\"max-age=31536000\""]
|
||||
"""Apache header arguments for HSTS"""
|
||||
|
||||
UIR_ARGS = ["always", "set", "Content-Security-Policy",
|
||||
"upgrade-insecure-requests"]
|
||||
UIR_ARGS: List[str] = ["always", "set", "Content-Security-Policy", "upgrade-insecure-requests"]
|
||||
|
||||
HEADER_ARGS = {"Strict-Transport-Security": HSTS_ARGS,
|
||||
"Upgrade-Insecure-Requests": UIR_ARGS}
|
||||
HEADER_ARGS: Dict[str, List[str]] = {
|
||||
"Strict-Transport-Security": HSTS_ARGS, "Upgrade-Insecure-Requests": UIR_ARGS,
|
||||
}
|
||||
|
||||
AUTOHSTS_STEPS = [60, 300, 900, 3600, 21600, 43200, 86400]
|
||||
AUTOHSTS_STEPS: List[int] = [60, 300, 900, 3600, 21600, 43200, 86400]
|
||||
"""AutoHSTS increase steps: 1min, 5min, 15min, 1h, 6h, 12h, 24h"""
|
||||
|
||||
AUTOHSTS_PERMANENT = 31536000
|
||||
AUTOHSTS_PERMANENT: int = 31536000
|
||||
"""Value for the last max-age of HSTS"""
|
||||
|
||||
AUTOHSTS_FREQ = 172800
|
||||
AUTOHSTS_FREQ: int = 172800
|
||||
"""Minimum time since last increase to perform a new one: 48h"""
|
||||
|
||||
MANAGED_COMMENT = "DO NOT REMOVE - Managed by Certbot"
|
||||
MANAGED_COMMENT_ID = MANAGED_COMMENT+", VirtualHost id: {0}"
|
||||
MANAGED_COMMENT: str = "DO NOT REMOVE - Managed by Certbot"
|
||||
MANAGED_COMMENT_ID: str = MANAGED_COMMENT + ", VirtualHost id: {0}"
|
||||
"""Managed by Certbot comments and the VirtualHost identification template"""
|
||||
|
||||
@@ -1,18 +1,23 @@
|
||||
"""Contains UI methods for Apache operations."""
|
||||
import logging
|
||||
from typing import Iterable
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from typing import Tuple
|
||||
|
||||
from certbot import errors
|
||||
from certbot.compat import os
|
||||
from certbot.display import util as display_util
|
||||
from certbot_apache._internal import obj
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def select_vhost_multiple(vhosts):
|
||||
def select_vhost_multiple(vhosts: Optional[List[obj.VirtualHost]]) -> List[obj.VirtualHost]:
|
||||
"""Select multiple Vhosts to install the certificate for
|
||||
|
||||
:param vhosts: Available Apache VirtualHosts
|
||||
:type vhosts: :class:`list` of type `~obj.Vhost`
|
||||
:type vhosts: :class:`list` of type `~obj.VirtualHost`
|
||||
|
||||
:returns: List of VirtualHosts
|
||||
:rtype: :class:`list`of type `~obj.Vhost`
|
||||
@@ -32,7 +37,7 @@ def select_vhost_multiple(vhosts):
|
||||
return []
|
||||
|
||||
|
||||
def _reversemap_vhosts(names, vhosts):
|
||||
def _reversemap_vhosts(names: Iterable[str], vhosts: List[obj.VirtualHost]):
|
||||
"""Helper function for select_vhost_multiple for mapping string
|
||||
representations back to actual vhost objects"""
|
||||
return_vhosts = []
|
||||
@@ -44,9 +49,10 @@ def _reversemap_vhosts(names, vhosts):
|
||||
return return_vhosts
|
||||
|
||||
|
||||
def select_vhost(domain, vhosts):
|
||||
def select_vhost(domain: str, vhosts: List[obj.VirtualHost]) -> Optional[obj.VirtualHost]:
|
||||
"""Select an appropriate Apache Vhost.
|
||||
|
||||
:param domain: Domain to select
|
||||
:param vhosts: Available Apache VirtualHosts
|
||||
:type vhosts: :class:`list` of type `~obj.Vhost`
|
||||
|
||||
@@ -62,7 +68,7 @@ def select_vhost(domain, vhosts):
|
||||
return None
|
||||
|
||||
|
||||
def _vhost_menu(domain, vhosts):
|
||||
def _vhost_menu(domain: str, vhosts: List[obj.VirtualHost]) -> Tuple[str, int]:
|
||||
"""Select an appropriate Apache Vhost.
|
||||
|
||||
:param vhosts: Available Apache Virtual Hosts
|
||||
|
||||
@@ -1,7 +1,16 @@
|
||||
""" Dual ParserNode implementation """
|
||||
from typing import Any
|
||||
from typing import Callable
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from typing import Sequence
|
||||
from typing import Set
|
||||
from typing import Tuple
|
||||
|
||||
from certbot_apache._internal import apacheparser
|
||||
from certbot_apache._internal import assertions
|
||||
from certbot_apache._internal import augeasparser
|
||||
from certbot_apache._internal import interfaces
|
||||
|
||||
|
||||
class DualNodeBase:
|
||||
@@ -9,12 +18,12 @@ class DualNodeBase:
|
||||
base class for dual parser interface classes. This class handles runtime
|
||||
attribute value assertions."""
|
||||
|
||||
def save(self, msg): # pragma: no cover
|
||||
def save(self, msg: str): # pragma: no cover
|
||||
""" Call save for both parsers """
|
||||
self.primary.save(msg)
|
||||
self.secondary.save(msg)
|
||||
|
||||
def __getattr__(self, aname):
|
||||
def __getattr__(self, aname: str) -> Any:
|
||||
""" Attribute value assertion """
|
||||
firstval = getattr(self.primary, aname)
|
||||
secondval = getattr(self.secondary, aname)
|
||||
@@ -28,11 +37,13 @@ class DualNodeBase:
|
||||
assertions.assertEqualSimple(firstval, secondval)
|
||||
return firstval
|
||||
|
||||
def find_ancestors(self, name):
|
||||
def find_ancestors(self, name: str) -> Sequence[interfaces.ParserNode]:
|
||||
""" Traverses the ancestor tree and returns ancestors matching name """
|
||||
return self._find_helper(DualBlockNode, "find_ancestors", name)
|
||||
|
||||
def _find_helper(self, nodeclass, findfunc, search, **kwargs):
|
||||
def _find_helper(
|
||||
self, nodeclass: Callable, findfunc: str, search: str, **kwargs: Any
|
||||
) -> List[apacheparser.ApacheBlockNode]:
|
||||
"""A helper for find_* functions. The function specific attributes should
|
||||
be passed as keyword arguments.
|
||||
|
||||
@@ -75,7 +86,7 @@ class DualNodeBase:
|
||||
class DualCommentNode(DualNodeBase):
|
||||
""" Dual parser implementation of CommentNode interface """
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
def __init__(self, **kwargs: Any):
|
||||
""" This initialization implementation allows ordinary initialization
|
||||
of CommentNode objects as well as creating a DualCommentNode object
|
||||
using precreated or fetched CommentNode objects if provided as optional
|
||||
@@ -107,7 +118,7 @@ class DualCommentNode(DualNodeBase):
|
||||
class DualDirectiveNode(DualNodeBase):
|
||||
""" Dual parser implementation of DirectiveNode interface """
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
def __init__(self, **kwargs: Any):
|
||||
""" This initialization implementation allows ordinary initialization
|
||||
of DirectiveNode objects as well as creating a DualDirectiveNode object
|
||||
using precreated or fetched DirectiveNode objects if provided as optional
|
||||
@@ -118,8 +129,6 @@ class DualDirectiveNode(DualNodeBase):
|
||||
:param DirectiveNode primary: Primary pre-created DirectiveNode, mainly
|
||||
used when creating new DualParser nodes using add_* methods.
|
||||
:param DirectiveNode secondary: Secondary pre-created DirectiveNode
|
||||
|
||||
|
||||
"""
|
||||
|
||||
kwargs.setdefault("primary", None)
|
||||
@@ -132,8 +141,12 @@ class DualDirectiveNode(DualNodeBase):
|
||||
self.primary = primary
|
||||
self.secondary = secondary
|
||||
else:
|
||||
self.primary = augeasparser.AugeasDirectiveNode(**kwargs)
|
||||
self.secondary = apacheparser.ApacheDirectiveNode(**kwargs)
|
||||
self.primary = augeasparser.AugeasDirectiveNode(
|
||||
**kwargs
|
||||
)
|
||||
self.secondary = apacheparser.ApacheDirectiveNode(
|
||||
**kwargs
|
||||
)
|
||||
|
||||
assertions.assertEqual(self.primary, self.secondary)
|
||||
|
||||
@@ -149,7 +162,7 @@ class DualDirectiveNode(DualNodeBase):
|
||||
class DualBlockNode(DualNodeBase):
|
||||
""" Dual parser implementation of BlockNode interface """
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
def __init__(self, **kwargs: Any):
|
||||
""" This initialization implementation allows ordinary initialization
|
||||
of BlockNode objects as well as creating a DualBlockNode object
|
||||
using precreated or fetched BlockNode objects if provided as optional
|
||||
@@ -164,8 +177,8 @@ class DualBlockNode(DualNodeBase):
|
||||
|
||||
kwargs.setdefault("primary", None)
|
||||
kwargs.setdefault("secondary", None)
|
||||
primary = kwargs.pop("primary")
|
||||
secondary = kwargs.pop("secondary")
|
||||
primary: Optional[augeasparser.AugeasBlockNode] = kwargs.pop("primary")
|
||||
secondary: Optional[apacheparser.ApacheBlockNode] = kwargs.pop("secondary")
|
||||
|
||||
if primary or secondary:
|
||||
assert primary and secondary
|
||||
@@ -177,7 +190,9 @@ class DualBlockNode(DualNodeBase):
|
||||
|
||||
assertions.assertEqual(self.primary, self.secondary)
|
||||
|
||||
def add_child_block(self, name, parameters=None, position=None):
|
||||
def add_child_block(
|
||||
self, name: str, parameters: Optional[str] = None, position: Optional[int] = None
|
||||
) -> "DualBlockNode":
|
||||
""" Creates a new child BlockNode, asserts that both implementations
|
||||
did it in a similar way, and returns a newly created DualBlockNode object
|
||||
encapsulating both of the newly created objects """
|
||||
@@ -187,7 +202,9 @@ class DualBlockNode(DualNodeBase):
|
||||
assertions.assertEqual(primary_new, secondary_new)
|
||||
return DualBlockNode(primary=primary_new, secondary=secondary_new)
|
||||
|
||||
def add_child_directive(self, name, parameters=None, position=None):
|
||||
def add_child_directive(
|
||||
self, name: str, parameters: Optional[str] = None, position: Optional[int] = None
|
||||
) -> DualDirectiveNode:
|
||||
""" Creates a new child DirectiveNode, asserts that both implementations
|
||||
did it in a similar way, and returns a newly created DualDirectiveNode
|
||||
object encapsulating both of the newly created objects """
|
||||
@@ -197,17 +214,23 @@ class DualBlockNode(DualNodeBase):
|
||||
assertions.assertEqual(primary_new, secondary_new)
|
||||
return DualDirectiveNode(primary=primary_new, secondary=secondary_new)
|
||||
|
||||
def add_child_comment(self, comment="", position=None):
|
||||
def add_child_comment(
|
||||
self, comment: str = "", position: Optional[int] = None
|
||||
) -> DualCommentNode:
|
||||
""" Creates a new child CommentNode, asserts that both implementations
|
||||
did it in a similar way, and returns a newly created DualCommentNode
|
||||
object encapsulating both of the newly created objects """
|
||||
|
||||
primary_new = self.primary.add_child_comment(comment, position)
|
||||
secondary_new = self.secondary.add_child_comment(comment, position)
|
||||
primary_new = self.primary.add_child_comment(comment=comment, position=position)
|
||||
secondary_new = self.secondary.add_child_comment(name=comment, position=position)
|
||||
assertions.assertEqual(primary_new, secondary_new)
|
||||
return DualCommentNode(primary=primary_new, secondary=secondary_new)
|
||||
|
||||
def _create_matching_list(self, primary_list, secondary_list):
|
||||
def _create_matching_list(
|
||||
self,
|
||||
primary_list: List[interfaces.ParserNode],
|
||||
secondary_list: List[interfaces.ParserNode],
|
||||
) -> List[Tuple[interfaces.ParserNode, interfaces.ParserNode]]:
|
||||
""" Matches the list of primary_list to a list of secondary_list and
|
||||
returns a list of tuples. This is used to create results for find_
|
||||
methods.
|
||||
@@ -234,7 +257,7 @@ class DualBlockNode(DualNodeBase):
|
||||
raise AssertionError("Could not find a matching node.")
|
||||
return matched
|
||||
|
||||
def find_blocks(self, name, exclude=True):
|
||||
def find_blocks(self, name: str, exclude: bool = True) -> List[apacheparser.ApacheBlockNode]:
|
||||
"""
|
||||
Performs a search for BlockNodes using both implementations and does simple
|
||||
checks for results. This is built upon the assumption that unimplemented
|
||||
@@ -246,7 +269,8 @@ class DualBlockNode(DualNodeBase):
|
||||
return self._find_helper(DualBlockNode, "find_blocks", name,
|
||||
exclude=exclude)
|
||||
|
||||
def find_directives(self, name, exclude=True):
|
||||
def find_directives(self, name: str, exclude: bool = True
|
||||
) -> Sequence[apacheparser.ApacheDirectiveNode]:
|
||||
"""
|
||||
Performs a search for DirectiveNodes using both implementations and
|
||||
checks the results. This is built upon the assumption that unimplemented
|
||||
@@ -258,7 +282,7 @@ class DualBlockNode(DualNodeBase):
|
||||
return self._find_helper(DualDirectiveNode, "find_directives", name,
|
||||
exclude=exclude)
|
||||
|
||||
def find_comments(self, comment):
|
||||
def find_comments(self, comment: str) -> Sequence[apacheparser.ApacheParserNode]:
|
||||
"""
|
||||
Performs a search for CommentNodes using both implementations and
|
||||
checks the results. This is built upon the assumption that unimplemented
|
||||
@@ -269,7 +293,7 @@ class DualBlockNode(DualNodeBase):
|
||||
|
||||
return self._find_helper(DualCommentNode, "find_comments", comment)
|
||||
|
||||
def delete_child(self, child):
|
||||
def delete_child(self, child: "DualBlockNode"):
|
||||
"""Deletes a child from the ParserNode implementations. The actual
|
||||
ParserNode implementations are used here directly in order to be able
|
||||
to match a child to the list of children."""
|
||||
@@ -277,7 +301,7 @@ class DualBlockNode(DualNodeBase):
|
||||
self.primary.delete_child(child.primary)
|
||||
self.secondary.delete_child(child.secondary)
|
||||
|
||||
def unsaved_files(self):
|
||||
def unsaved_files(self) -> Set[str]:
|
||||
""" Fetches the list of unsaved file paths and asserts that the lists
|
||||
match """
|
||||
primary_files = self.primary.unsaved_files()
|
||||
@@ -286,7 +310,7 @@ class DualBlockNode(DualNodeBase):
|
||||
|
||||
return primary_files
|
||||
|
||||
def parsed_paths(self):
|
||||
def parsed_paths(self) -> List[str]:
|
||||
"""
|
||||
Returns a list of file paths that have currently been parsed into the parser
|
||||
tree. The returned list may include paths with wildcard characters, for
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
""" Entry point for Apache Plugin """
|
||||
from typing import Callable
|
||||
from typing import Dict
|
||||
|
||||
from certbot import util
|
||||
from certbot_apache._internal import configurator
|
||||
from certbot_apache._internal import override_arch
|
||||
@@ -10,7 +13,7 @@ from certbot_apache._internal import override_gentoo
|
||||
from certbot_apache._internal import override_suse
|
||||
from certbot_apache._internal import override_void
|
||||
|
||||
OVERRIDE_CLASSES = {
|
||||
OVERRIDE_CLASSES: Dict[str, Callable] = {
|
||||
"arch": override_arch.ArchConfigurator,
|
||||
"cloudlinux": override_centos.CentOSConfigurator,
|
||||
"darwin": override_darwin.DarwinConfigurator,
|
||||
|
||||
@@ -1,11 +1,14 @@
|
||||
"""A class that performs HTTP-01 challenges for Apache"""
|
||||
import errno
|
||||
import logging
|
||||
from typing import Any
|
||||
from typing import List
|
||||
from typing import Set
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from acme.challenges import HTTP01Response
|
||||
from certbot import errors
|
||||
from certbot.achallenges import KeyAuthorizationAnnotatedChallenge
|
||||
from certbot.compat import filesystem
|
||||
from certbot.compat import os
|
||||
from certbot.plugins import common
|
||||
@@ -64,7 +67,7 @@ class ApacheHttp01(common.ChallengePerformer):
|
||||
"http_challenges")
|
||||
self.moded_vhosts: Set[VirtualHost] = set()
|
||||
|
||||
def perform(self):
|
||||
def perform(self) -> List[KeyAuthorizationAnnotatedChallenge]:
|
||||
"""Perform all HTTP-01 challenges."""
|
||||
if not self.achalls:
|
||||
return []
|
||||
@@ -72,8 +75,7 @@ class ApacheHttp01(common.ChallengePerformer):
|
||||
# About to make temporary changes to the config
|
||||
self.configurator.save("Changes before challenge setup", True)
|
||||
|
||||
self.configurator.ensure_listen(str(
|
||||
self.configurator.config.http01_port))
|
||||
self.configurator.ensure_listen(str(self.configurator.config.http01_port))
|
||||
self.prepare_http01_modules()
|
||||
|
||||
responses = self._set_up_challenges()
|
||||
@@ -84,7 +86,7 @@ class ApacheHttp01(common.ChallengePerformer):
|
||||
|
||||
return responses
|
||||
|
||||
def prepare_http01_modules(self):
|
||||
def prepare_http01_modules(self) -> None:
|
||||
"""Make sure that we have the needed modules available for http01"""
|
||||
|
||||
if self.configurator.conf("handle-modules"):
|
||||
@@ -97,7 +99,7 @@ class ApacheHttp01(common.ChallengePerformer):
|
||||
if mod + "_module" not in self.configurator.parser.modules:
|
||||
self.configurator.enable_mod(mod, temp=True)
|
||||
|
||||
def _mod_config(self):
|
||||
def _mod_config(self) -> None:
|
||||
selected_vhosts: List[VirtualHost] = []
|
||||
http_port = str(self.configurator.config.http01_port)
|
||||
|
||||
@@ -146,7 +148,7 @@ class ApacheHttp01(common.ChallengePerformer):
|
||||
with open(self.challenge_conf_post, "w") as new_conf:
|
||||
new_conf.write(config_text_post)
|
||||
|
||||
def _matching_vhosts(self, domain):
|
||||
def _matching_vhosts(self, domain: str) -> List[VirtualHost]:
|
||||
"""Return all VirtualHost objects that have the requested domain name or
|
||||
a wildcard name that would match the domain in ServerName or ServerAlias
|
||||
directive.
|
||||
@@ -160,9 +162,9 @@ class ApacheHttp01(common.ChallengePerformer):
|
||||
|
||||
return matching_vhosts
|
||||
|
||||
def _relevant_vhosts(self):
|
||||
def _relevant_vhosts(self) -> List[VirtualHost]:
|
||||
http01_port = str(self.configurator.config.http01_port)
|
||||
relevant_vhosts = []
|
||||
relevant_vhosts: List[VirtualHost] = []
|
||||
for vhost in self.configurator.vhosts:
|
||||
if any(a.is_wildcard() or a.get_port() == http01_port for a in vhost.addrs):
|
||||
if not vhost.ssl:
|
||||
@@ -180,7 +182,7 @@ class ApacheHttp01(common.ChallengePerformer):
|
||||
"""Return all VirtualHost objects with no ServerName"""
|
||||
return [vh for vh in self.configurator.vhosts if vh.name is None]
|
||||
|
||||
def _set_up_challenges(self):
|
||||
def _set_up_challenges(self) -> List[HTTP01Response]:
|
||||
if not os.path.isdir(self.challenge_dir):
|
||||
old_umask = filesystem.umask(0o022)
|
||||
try:
|
||||
@@ -198,10 +200,12 @@ class ApacheHttp01(common.ChallengePerformer):
|
||||
|
||||
return responses
|
||||
|
||||
def _set_up_challenge(self, achall):
|
||||
def _set_up_challenge(self, achall: KeyAuthorizationAnnotatedChallenge) -> HTTP01Response:
|
||||
response: HTTP01Response
|
||||
validation: Any
|
||||
response, validation = achall.response_and_validation()
|
||||
|
||||
name = os.path.join(self.challenge_dir, achall.chall.encode("token"))
|
||||
name: str = os.path.join(self.challenge_dir, achall.chall.encode("token"))
|
||||
|
||||
self.configurator.reverter.register_file_creation(True, name)
|
||||
with open(name, 'wb') as f:
|
||||
@@ -210,7 +214,7 @@ class ApacheHttp01(common.ChallengePerformer):
|
||||
|
||||
return response
|
||||
|
||||
def _set_up_include_directives(self, vhost):
|
||||
def _set_up_include_directives(self, vhost: VirtualHost) -> None:
|
||||
"""Includes override configuration to the beginning and to the end of
|
||||
VirtualHost. Note that this include isn't added to Augeas search tree"""
|
||||
|
||||
|
||||
@@ -100,6 +100,9 @@ For this reason the internal representation of data should not ignore the case.
|
||||
"""
|
||||
|
||||
import abc
|
||||
from typing import Any
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
|
||||
|
||||
class ParserNode(metaclass=abc.ABCMeta):
|
||||
@@ -146,7 +149,7 @@ class ParserNode(metaclass=abc.ABCMeta):
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def __init__(self, **kwargs):
|
||||
def __init__(self, **kwargs: Any):
|
||||
"""
|
||||
Initializes the ParserNode instance, and sets the ParserNode specific
|
||||
instance variables. This is not meant to be used directly, but through
|
||||
@@ -170,7 +173,7 @@ class ParserNode(metaclass=abc.ABCMeta):
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def save(self, msg):
|
||||
def save(self, msg: str) -> None:
|
||||
"""
|
||||
Save traverses the children, and attempts to write the AST to disk for
|
||||
all the objects that are marked dirty. The actual operation of course
|
||||
@@ -189,7 +192,7 @@ class ParserNode(metaclass=abc.ABCMeta):
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def find_ancestors(self, name):
|
||||
def find_ancestors(self, name: str):
|
||||
"""
|
||||
Traverses the ancestor tree up, searching for BlockNodes with a specific
|
||||
name.
|
||||
@@ -220,7 +223,7 @@ class CommentNode(ParserNode, metaclass=abc.ABCMeta):
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def __init__(self, **kwargs):
|
||||
def __init__(self, **kwargs: Any):
|
||||
"""
|
||||
Initializes the CommentNode instance and sets its instance variables.
|
||||
|
||||
@@ -238,10 +241,12 @@ class CommentNode(ParserNode, metaclass=abc.ABCMeta):
|
||||
created or changed after the last save. Default: False.
|
||||
:type dirty: bool
|
||||
"""
|
||||
super().__init__(ancestor=kwargs['ancestor'],
|
||||
dirty=kwargs.get('dirty', False),
|
||||
filepath=kwargs['filepath'],
|
||||
metadata=kwargs.get('metadata', {})) # pragma: no cover
|
||||
super().__init__( # pragma: no cover
|
||||
ancestor=kwargs['ancestor'],
|
||||
dirty=kwargs.get('dirty', False),
|
||||
filepath=kwargs['filepath'],
|
||||
metadata=kwargs.get('metadata', {}),
|
||||
)
|
||||
|
||||
|
||||
class DirectiveNode(ParserNode, metaclass=abc.ABCMeta):
|
||||
@@ -272,7 +277,7 @@ class DirectiveNode(ParserNode, metaclass=abc.ABCMeta):
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def __init__(self, **kwargs):
|
||||
def __init__(self, **kwargs: Any) -> None:
|
||||
"""
|
||||
Initializes the DirectiveNode instance and sets its instance variables.
|
||||
|
||||
@@ -302,13 +307,15 @@ class DirectiveNode(ParserNode, metaclass=abc.ABCMeta):
|
||||
:type enabled: bool
|
||||
|
||||
"""
|
||||
super().__init__(ancestor=kwargs['ancestor'],
|
||||
dirty=kwargs.get('dirty', False),
|
||||
filepath=kwargs['filepath'],
|
||||
metadata=kwargs.get('metadata', {})) # pragma: no cover
|
||||
super().__init__( # pragma: no cover
|
||||
ancestor=kwargs['ancestor'],
|
||||
dirty=kwargs.get('dirty', False),
|
||||
filepath=kwargs['filepath'],
|
||||
metadata=kwargs.get('metadata', {}),
|
||||
)
|
||||
|
||||
@abc.abstractmethod
|
||||
def set_parameters(self, parameters):
|
||||
def set_parameters(self, parameters: List[str]) -> None:
|
||||
"""
|
||||
Sets the sequence of parameters for this ParserNode object without
|
||||
whitespaces. While the whitespaces for parameters are discarded when using
|
||||
@@ -361,7 +368,9 @@ class BlockNode(DirectiveNode, metaclass=abc.ABCMeta):
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def add_child_block(self, name, parameters=None, position=None):
|
||||
def add_child_block(
|
||||
self, name: str, parameters: List[str] = None, position: int = None
|
||||
) -> "BlockNode":
|
||||
"""
|
||||
Adds a new BlockNode child node with provided values and marks the callee
|
||||
BlockNode dirty. This is used to add new children to the AST. The preceding
|
||||
@@ -381,7 +390,9 @@ class BlockNode(DirectiveNode, metaclass=abc.ABCMeta):
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def add_child_directive(self, name, parameters=None, position=None):
|
||||
def add_child_directive(
|
||||
self, name: str, parameters: Optional[List[str]] = None, position: Optional[int] = None
|
||||
) -> "DirectiveNode":
|
||||
"""
|
||||
Adds a new DirectiveNode child node with provided values and marks the
|
||||
callee BlockNode dirty. This is used to add new children to the AST. The
|
||||
@@ -402,7 +413,7 @@ class BlockNode(DirectiveNode, metaclass=abc.ABCMeta):
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def add_child_comment(self, comment="", position=None):
|
||||
def add_child_comment(self, comment: str = "", position: Optional[int] = None) -> "CommentNode":
|
||||
"""
|
||||
Adds a new CommentNode child node with provided value and marks the
|
||||
callee BlockNode dirty. This is used to add new children to the AST. The
|
||||
@@ -422,7 +433,7 @@ class BlockNode(DirectiveNode, metaclass=abc.ABCMeta):
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def find_blocks(self, name, exclude=True):
|
||||
def find_blocks(self, name: str, exclude: bool = True) -> List["BlockNode"]:
|
||||
"""
|
||||
Find a configuration block by name. This method walks the child tree of
|
||||
ParserNodes under the instance it was called from. This way it is possible
|
||||
@@ -439,7 +450,23 @@ class BlockNode(DirectiveNode, metaclass=abc.ABCMeta):
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def find_directives(self, name, exclude=True):
|
||||
def find_comments(self, comment: str) -> List["CommentNode"]:
|
||||
"""
|
||||
Find comments with value containing the search term.
|
||||
|
||||
This method walks the child tree of ParserNodes under the instance it was
|
||||
called from. This way it is possible to search for the whole configuration
|
||||
tree, when starting from root node, or to do a partial search when starting
|
||||
from a specified branch. The lookup should be case sensitive.
|
||||
|
||||
:param str comment: The content of comment to search for
|
||||
|
||||
:returns: A list of found CommentNode objects.
|
||||
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def find_directives(self, name: str, exclude: bool = True):
|
||||
"""
|
||||
Find a directive by name. This method walks the child tree of ParserNodes
|
||||
under the instance it was called from. This way it is possible to search
|
||||
@@ -457,23 +484,7 @@ class BlockNode(DirectiveNode, metaclass=abc.ABCMeta):
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def find_comments(self, comment):
|
||||
"""
|
||||
Find comments with value containing the search term.
|
||||
|
||||
This method walks the child tree of ParserNodes under the instance it was
|
||||
called from. This way it is possible to search for the whole configuration
|
||||
tree, when starting from root node, or to do a partial search when starting
|
||||
from a specified branch. The lookup should be case sensitive.
|
||||
|
||||
:param str comment: The content of comment to search for
|
||||
|
||||
:returns: A list of found CommentNode objects.
|
||||
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def delete_child(self, child):
|
||||
def delete_child(self, child: "ParserNode") -> None:
|
||||
"""
|
||||
Remove a specified child node from the list of children of the called
|
||||
BlockNode object.
|
||||
@@ -483,7 +494,7 @@ class BlockNode(DirectiveNode, metaclass=abc.ABCMeta):
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def unsaved_files(self):
|
||||
def unsaved_files(self) -> List[str]:
|
||||
"""
|
||||
Returns a list of file paths that have been changed since the last save
|
||||
(or the initial configuration parse). The intended use for this method
|
||||
@@ -496,7 +507,7 @@ class BlockNode(DirectiveNode, metaclass=abc.ABCMeta):
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def parsed_paths(self):
|
||||
def parsed_paths(self) -> List[str]:
|
||||
"""
|
||||
Returns a list of file paths that have currently been parsed into the parser
|
||||
tree. The returned list may include paths with wildcard characters, for
|
||||
|
||||
@@ -1,14 +1,19 @@
|
||||
"""Module contains classes used by the Apache Configurator."""
|
||||
import re
|
||||
from typing import Any
|
||||
from typing import Iterable
|
||||
from typing import Optional
|
||||
from typing import Pattern
|
||||
from typing import Set
|
||||
|
||||
from certbot.plugins import common
|
||||
from certbot_apache._internal import interfaces
|
||||
|
||||
|
||||
class Addr(common.Addr):
|
||||
"""Represents an Apache address."""
|
||||
|
||||
def __eq__(self, other):
|
||||
def __eq__(self, other: Any):
|
||||
"""This is defined as equivalent within Apache.
|
||||
|
||||
ip_addr:* == ip_addr
|
||||
@@ -28,12 +33,12 @@ class Addr(common.Addr):
|
||||
# __cmp__ is overridden. See https://bugs.python.org/issue2235
|
||||
return super().__hash__()
|
||||
|
||||
def _addr_less_specific(self, addr):
|
||||
def _addr_less_specific(self, addr: "Addr") -> bool:
|
||||
"""Returns if addr.get_addr() is more specific than self.get_addr()."""
|
||||
# pylint: disable=protected-access
|
||||
return addr._rank_specific_addr() > self._rank_specific_addr()
|
||||
|
||||
def _rank_specific_addr(self):
|
||||
def _rank_specific_addr(self) -> int:
|
||||
"""Returns numerical rank for get_addr()
|
||||
|
||||
:returns: 2 - FQ, 1 - wildcard, 0 - _default_
|
||||
@@ -46,7 +51,7 @@ class Addr(common.Addr):
|
||||
return 1
|
||||
return 2
|
||||
|
||||
def conflicts(self, addr):
|
||||
def conflicts(self, addr: "Addr") -> bool:
|
||||
r"""Returns if address could conflict with correct function of self.
|
||||
|
||||
Could addr take away service provided by self within Apache?
|
||||
@@ -74,11 +79,11 @@ class Addr(common.Addr):
|
||||
return True
|
||||
return False
|
||||
|
||||
def is_wildcard(self):
|
||||
def is_wildcard(self) -> bool:
|
||||
"""Returns if address has a wildcard port."""
|
||||
return self.tup[1] == "*" or not self.tup[1]
|
||||
|
||||
def get_sni_addr(self, port):
|
||||
def get_sni_addr(self, port: str) -> common.Addr:
|
||||
"""Returns the least specific address that resolves on the port.
|
||||
|
||||
Examples:
|
||||
@@ -118,13 +123,16 @@ class VirtualHost:
|
||||
|
||||
"""
|
||||
# ?: is used for not returning enclosed characters
|
||||
strip_name = re.compile(r"^(?:.+://)?([^ :$]*)")
|
||||
strip_name: Pattern = re.compile(r"^(?:.+://)?([^ :$]*)")
|
||||
|
||||
def __init__(self, filep, path, addrs, ssl, enabled, name=None,
|
||||
aliases=None, modmacro=False, ancestor=None, node=None):
|
||||
def __init__(
|
||||
self, filepath: str, path: str, addrs: Set["Addr"],
|
||||
ssl: bool, enabled: bool, name: Optional[str] = None,
|
||||
aliases: Optional[Set[str]] = None, modmacro: bool = False,
|
||||
ancestor: Optional["VirtualHost"] = None, node = None):
|
||||
|
||||
"""Initialize a VH."""
|
||||
self.filep = filep
|
||||
self.filep = filepath
|
||||
self.path = path
|
||||
self.addrs = addrs
|
||||
self.name = name
|
||||
@@ -133,9 +141,9 @@ class VirtualHost:
|
||||
self.enabled = enabled
|
||||
self.modmacro = modmacro
|
||||
self.ancestor = ancestor
|
||||
self.node = node
|
||||
self.node: interfaces.BlockNode = node
|
||||
|
||||
def get_names(self):
|
||||
def get_names(self) -> Set[str]:
|
||||
"""Return a set of all names."""
|
||||
all_names: Set[str] = set()
|
||||
all_names.update(self.aliases)
|
||||
@@ -157,7 +165,7 @@ class VirtualHost:
|
||||
f"mod_macro Vhost: {'Yes' if self.modmacro else 'No'}"
|
||||
)
|
||||
|
||||
def display_repr(self):
|
||||
def display_repr(self) -> str:
|
||||
"""Return a representation of VHost to be used in dialog"""
|
||||
return (
|
||||
f"File: {self.filep}\n"
|
||||
@@ -166,7 +174,7 @@ class VirtualHost:
|
||||
f"HTTPS: {'Yes' if self.ssl else 'No'}\n"
|
||||
)
|
||||
|
||||
def __eq__(self, other):
|
||||
def __eq__(self, other: Any) -> bool:
|
||||
if isinstance(other, self.__class__):
|
||||
return (self.filep == other.filep and self.path == other.path and
|
||||
self.addrs == other.addrs and
|
||||
@@ -182,7 +190,7 @@ class VirtualHost:
|
||||
tuple(self.addrs), tuple(self.get_names()),
|
||||
self.ssl, self.enabled, self.modmacro))
|
||||
|
||||
def conflicts(self, addrs):
|
||||
def conflicts(self, addrs: Iterable[Addr]) -> bool:
|
||||
"""See if vhost conflicts with any of the addrs.
|
||||
|
||||
This determines whether or not these addresses would/could overwrite
|
||||
@@ -201,7 +209,7 @@ class VirtualHost:
|
||||
return True
|
||||
return False
|
||||
|
||||
def same_server(self, vhost, generic=False):
|
||||
def same_server(self, vhost: "VirtualHost", generic: bool = False) -> bool:
|
||||
"""Determines if the vhost is the same 'server'.
|
||||
|
||||
Used in redirection - indicates whether or not the two virtual hosts
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
""" Distribution specific override class for CentOS family (RHEL, Fedora) """
|
||||
import logging
|
||||
from typing import Any
|
||||
from typing import cast
|
||||
from typing import List
|
||||
|
||||
@@ -30,7 +31,7 @@ class CentOSConfigurator(configurator.ApacheConfigurator):
|
||||
challenge_location="/etc/httpd/conf.d",
|
||||
)
|
||||
|
||||
def config_test(self):
|
||||
def config_test(self) -> None:
|
||||
"""
|
||||
Override config_test to mitigate configtest error in vanilla installation
|
||||
of mod_ssl in Fedora. The error is caused by non-existent self-signed
|
||||
@@ -49,7 +50,7 @@ class CentOSConfigurator(configurator.ApacheConfigurator):
|
||||
else:
|
||||
raise
|
||||
|
||||
def _try_restart_fedora(self):
|
||||
def _try_restart_fedora(self) -> None:
|
||||
"""
|
||||
Tries to restart httpd using systemctl to generate the self signed key pair.
|
||||
"""
|
||||
@@ -62,7 +63,7 @@ class CentOSConfigurator(configurator.ApacheConfigurator):
|
||||
# Finish with actual config check to see if systemctl restart helped
|
||||
super().config_test()
|
||||
|
||||
def _prepare_options(self):
|
||||
def _prepare_options(self) -> None:
|
||||
"""
|
||||
Override the options dictionary initialization in order to support
|
||||
alternative restart cmd used in CentOS.
|
||||
@@ -72,13 +73,12 @@ class CentOSConfigurator(configurator.ApacheConfigurator):
|
||||
raise ValueError("OS option restart_cmd_alt must be set for CentOS.")
|
||||
self.options.restart_cmd_alt[0] = self.options.ctl
|
||||
|
||||
def get_parser(self):
|
||||
def get_parser(self) -> "CentOSParser":
|
||||
"""Initializes the ApacheParser"""
|
||||
return CentOSParser(
|
||||
self.options.server_root, self.options.vhost_root,
|
||||
self.version, configurator=self)
|
||||
self.options.server_root, self, self.options.vhost_root, self.version)
|
||||
|
||||
def _deploy_cert(self, *args, **kwargs): # pylint: disable=arguments-differ
|
||||
def _deploy_cert(self, *args: Any, **kwargs: Any): # pylint: disable=arguments-differ
|
||||
"""
|
||||
Override _deploy_cert in order to ensure that the Apache configuration
|
||||
has "LoadModule ssl_module..." before parsing the VirtualHost configuration
|
||||
@@ -88,7 +88,7 @@ class CentOSConfigurator(configurator.ApacheConfigurator):
|
||||
if self.version < (2, 4, 0):
|
||||
self._deploy_loadmodule_ssl_if_needed()
|
||||
|
||||
def _deploy_loadmodule_ssl_if_needed(self):
|
||||
def _deploy_loadmodule_ssl_if_needed(self) -> None:
|
||||
"""
|
||||
Add "LoadModule ssl_module <pre-existing path>" to main httpd.conf if
|
||||
it doesn't exist there already.
|
||||
@@ -110,14 +110,13 @@ class CentOSConfigurator(configurator.ApacheConfigurator):
|
||||
"use, and run Certbot again.")
|
||||
raise MisconfigurationError(msg)
|
||||
else:
|
||||
loadmod_args = path_args
|
||||
loadmod_args = [arg for arg in path_args if arg]
|
||||
|
||||
centos_parser: CentOSParser = cast(CentOSParser, self.parser)
|
||||
if centos_parser.not_modssl_ifmodule(noarg_path):
|
||||
if centos_parser.loc["default"] in noarg_path:
|
||||
# LoadModule already in the main configuration file
|
||||
if ("ifmodule/" in noarg_path.lower() or
|
||||
"ifmodule[1]" in noarg_path.lower()):
|
||||
if "ifmodule/" in noarg_path.lower() or "ifmodule[1]" in noarg_path.lower():
|
||||
# It's the first or only IfModule in the file
|
||||
return
|
||||
# Populate the list of known !mod_ssl.c IfModules
|
||||
@@ -148,8 +147,7 @@ class CentOSConfigurator(configurator.ApacheConfigurator):
|
||||
self.parser.aug.remove(loadmod_path)
|
||||
|
||||
# Create a new IfModule !mod_ssl.c if not already found on path
|
||||
ssl_ifmod = self.parser.get_ifmod(nodir_path, "!mod_ssl.c",
|
||||
beginning=True)[:-1]
|
||||
ssl_ifmod = self.parser.get_ifmod(nodir_path, "!mod_ssl.c", beginning=True)[:-1]
|
||||
if ssl_ifmod not in correct_ifmods:
|
||||
self.parser.add_dir(ssl_ifmod, "LoadModule", loadmod_args)
|
||||
correct_ifmods.append(ssl_ifmod)
|
||||
@@ -159,24 +157,24 @@ class CentOSConfigurator(configurator.ApacheConfigurator):
|
||||
|
||||
class CentOSParser(parser.ApacheParser):
|
||||
"""CentOS specific ApacheParser override class"""
|
||||
def __init__(self, *args, **kwargs):
|
||||
def __init__(self, *args: Any, **kwargs: Any) -> None:
|
||||
# CentOS specific configuration file for Apache
|
||||
self.sysconfig_filep = "/etc/sysconfig/httpd"
|
||||
self.sysconfig_filep: str = "/etc/sysconfig/httpd"
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def update_runtime_variables(self):
|
||||
def update_runtime_variables(self) -> None:
|
||||
""" Override for update_runtime_variables for custom parsing """
|
||||
# Opportunistic, works if SELinux not enforced
|
||||
super().update_runtime_variables()
|
||||
self.parse_sysconfig_var()
|
||||
|
||||
def parse_sysconfig_var(self):
|
||||
def parse_sysconfig_var(self) -> None:
|
||||
""" Parses Apache CLI options from CentOS configuration file """
|
||||
defines = apache_util.parse_define_file(self.sysconfig_filep, "OPTIONS")
|
||||
for k, v in defines.items():
|
||||
self.variables[k] = v
|
||||
|
||||
def not_modssl_ifmodule(self, path):
|
||||
def not_modssl_ifmodule(self, path: str) -> bool:
|
||||
"""Checks if the provided Augeas path has argument !mod_ssl"""
|
||||
|
||||
if "ifmodule" not in path.lower():
|
||||
|
||||
@@ -8,6 +8,7 @@ from certbot.compat import os
|
||||
from certbot_apache._internal import apache_util
|
||||
from certbot_apache._internal import configurator
|
||||
from certbot_apache._internal.configurator import OsOptions
|
||||
from certbot_apache._internal.obj import VirtualHost
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -22,7 +23,7 @@ class DebianConfigurator(configurator.ApacheConfigurator):
|
||||
handle_sites=True,
|
||||
)
|
||||
|
||||
def enable_site(self, vhost):
|
||||
def enable_site(self, vhost: VirtualHost) -> None:
|
||||
"""Enables an available site, Apache reload required.
|
||||
|
||||
.. note:: Does not make sure that the site correctly works or that all
|
||||
@@ -67,7 +68,7 @@ class DebianConfigurator(configurator.ApacheConfigurator):
|
||||
self.save_notes += "Enabled site %s\n" % vhost.filep
|
||||
return None
|
||||
|
||||
def enable_mod(self, mod_name, temp=False):
|
||||
def enable_mod(self, mod_name: str, temp: bool = False) -> None:
|
||||
"""Enables module in Apache.
|
||||
|
||||
Both enables and reloads Apache so module is active.
|
||||
@@ -113,7 +114,7 @@ class DebianConfigurator(configurator.ApacheConfigurator):
|
||||
# Reload is not necessary as DUMP_RUN_CFG uses latest config.
|
||||
self.parser.update_runtime_variables()
|
||||
|
||||
def _enable_mod_debian(self, mod_name, temp):
|
||||
def _enable_mod_debian(self, mod_name: str, temp: bool) -> None:
|
||||
"""Assumes mods-available, mods-enabled layout."""
|
||||
# Generate reversal command.
|
||||
# Try to be safe here... check that we can probably reverse before
|
||||
@@ -124,6 +125,5 @@ class DebianConfigurator(configurator.ApacheConfigurator):
|
||||
"Unable to find a2dismod, please make sure a2enmod and "
|
||||
"a2dismod are configured correctly for certbot.")
|
||||
|
||||
self.reverter.register_undo_command(
|
||||
temp, [self.options.dismod, "-f", mod_name])
|
||||
self.reverter.register_undo_command(temp, [self.options.dismod, "-f", mod_name])
|
||||
util.run_script([self.options.enmod, mod_name])
|
||||
|
||||
@@ -23,7 +23,7 @@ class FedoraConfigurator(configurator.ApacheConfigurator):
|
||||
challenge_location="/etc/httpd/conf.d",
|
||||
)
|
||||
|
||||
def config_test(self):
|
||||
def config_test(self) -> None:
|
||||
"""
|
||||
Override config_test to mitigate configtest error in vanilla installation
|
||||
of mod_ssl in Fedora. The error is caused by non-existent self-signed
|
||||
@@ -35,13 +35,12 @@ class FedoraConfigurator(configurator.ApacheConfigurator):
|
||||
except errors.MisconfigurationError:
|
||||
self._try_restart_fedora()
|
||||
|
||||
def get_parser(self):
|
||||
def get_parser(self) -> "FedoraParser":
|
||||
"""Initializes the ApacheParser"""
|
||||
return FedoraParser(
|
||||
self.options.server_root, self.options.vhost_root,
|
||||
self.version, configurator=self)
|
||||
self.options.server_root, self, self.options.vhost_root, self.version)
|
||||
|
||||
def _try_restart_fedora(self):
|
||||
def _try_restart_fedora(self) -> None:
|
||||
"""
|
||||
Tries to restart httpd using systemctl to generate the self signed key pair.
|
||||
"""
|
||||
@@ -53,7 +52,7 @@ class FedoraConfigurator(configurator.ApacheConfigurator):
|
||||
# Finish with actual config check to see if systemctl restart helped
|
||||
super().config_test()
|
||||
|
||||
def _prepare_options(self):
|
||||
def _prepare_options(self) -> None:
|
||||
"""
|
||||
Override the options dictionary initialization to keep using apachectl
|
||||
instead of httpd and so take advantages of this new bash script in newer versions
|
||||
@@ -69,18 +68,18 @@ class FedoraConfigurator(configurator.ApacheConfigurator):
|
||||
|
||||
class FedoraParser(parser.ApacheParser):
|
||||
"""Fedora 29+ specific ApacheParser override class"""
|
||||
def __init__(self, *args, **kwargs):
|
||||
def __init__(self, *args, **kwargs) -> None:
|
||||
# Fedora 29+ specific configuration file for Apache
|
||||
self.sysconfig_filep = "/etc/sysconfig/httpd"
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def update_runtime_variables(self):
|
||||
def update_runtime_variables(self) -> None:
|
||||
""" Override for update_runtime_variables for custom parsing """
|
||||
# Opportunistic, works if SELinux not enforced
|
||||
super().update_runtime_variables()
|
||||
self._parse_sysconfig_var()
|
||||
|
||||
def _parse_sysconfig_var(self):
|
||||
def _parse_sysconfig_var(self) -> None:
|
||||
""" Parses Apache CLI options from Fedora configuration file """
|
||||
defines = apache_util.parse_define_file(self.sysconfig_filep, "OPTIONS")
|
||||
for k, v in defines.items():
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
""" Distribution specific override class for Gentoo Linux """
|
||||
from typing import Any
|
||||
|
||||
from certbot_apache._internal import apache_util
|
||||
from certbot_apache._internal import configurator
|
||||
from certbot_apache._internal import parser
|
||||
@@ -16,7 +18,7 @@ class GentooConfigurator(configurator.ApacheConfigurator):
|
||||
challenge_location="/etc/apache2/vhosts.d",
|
||||
)
|
||||
|
||||
def _prepare_options(self):
|
||||
def _prepare_options(self) -> None:
|
||||
"""
|
||||
Override the options dictionary initialization in order to support
|
||||
alternative restart cmd used in Gentoo.
|
||||
@@ -26,33 +28,32 @@ class GentooConfigurator(configurator.ApacheConfigurator):
|
||||
raise ValueError("OS option restart_cmd_alt must be set for Gentoo.")
|
||||
self.options.restart_cmd_alt[0] = self.options.ctl
|
||||
|
||||
def get_parser(self):
|
||||
def get_parser(self) -> "GentooParser":
|
||||
"""Initializes the ApacheParser"""
|
||||
return GentooParser(
|
||||
self.options.server_root, self.options.vhost_root,
|
||||
self.version, configurator=self)
|
||||
self.options.server_root, self, self.options.vhost_root, self.version)
|
||||
|
||||
|
||||
class GentooParser(parser.ApacheParser):
|
||||
"""Gentoo specific ApacheParser override class"""
|
||||
def __init__(self, *args, **kwargs):
|
||||
def __init__(self, *args: Any, **kwargs: Any) -> None:
|
||||
# Gentoo specific configuration file for Apache2
|
||||
self.apacheconfig_filep = "/etc/conf.d/apache2"
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def update_runtime_variables(self):
|
||||
def update_runtime_variables(self) -> None:
|
||||
""" Override for update_runtime_variables for custom parsing """
|
||||
self.parse_sysconfig_var()
|
||||
self.update_modules()
|
||||
|
||||
def parse_sysconfig_var(self):
|
||||
def parse_sysconfig_var(self) -> None:
|
||||
""" Parses Apache CLI options from Gentoo configuration file """
|
||||
defines = apache_util.parse_define_file(self.apacheconfig_filep,
|
||||
"APACHE2_OPTS")
|
||||
for k, v in defines.items():
|
||||
self.variables[k] = v
|
||||
|
||||
def update_modules(self):
|
||||
def update_modules(self) -> None:
|
||||
"""Get loaded modules from httpd process, and add them to DOM"""
|
||||
mod_cmd = [self.configurator.options.ctl, "modules"]
|
||||
matches = apache_util.parse_from_subprocess(mod_cmd, r"(.*)_module")
|
||||
|
||||
@@ -3,15 +3,26 @@ import copy
|
||||
import fnmatch
|
||||
import logging
|
||||
import re
|
||||
from typing import Collection
|
||||
from typing import Dict
|
||||
from typing import Iterable
|
||||
from typing import List
|
||||
from typing import Mapping
|
||||
from typing import Optional
|
||||
from typing import Pattern
|
||||
from typing import Set
|
||||
from typing import TYPE_CHECKING
|
||||
from typing import Tuple
|
||||
from typing import Union
|
||||
|
||||
from certbot import errors
|
||||
from certbot.compat import os
|
||||
from certbot_apache._internal import apache_util
|
||||
from certbot_apache._internal import constants
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from certbot_apache._internal.configurator import ApacheConfigurator # pragma: no cover
|
||||
|
||||
try:
|
||||
from augeas import Augeas
|
||||
except ImportError: # pragma: no cover
|
||||
@@ -32,11 +43,11 @@ class ApacheParser:
|
||||
default - user config file, name - NameVirtualHost,
|
||||
|
||||
"""
|
||||
arg_var_interpreter = re.compile(r"\$\{[^ \}]*}")
|
||||
fnmatch_chars = {"*", "?", "\\", "[", "]"}
|
||||
arg_var_interpreter: Pattern = re.compile(r"\$\{[^ \}]*}")
|
||||
fnmatch_chars: Set[str] = {"*", "?", "\\", "[", "]"}
|
||||
|
||||
def __init__(self, root, vhostroot=None, version=(2, 4),
|
||||
configurator=None):
|
||||
def __init__(self, root: str, configurator: "ApacheConfigurator",
|
||||
vhostroot: str, version: Tuple[int, ...] = (2, 4)) -> None:
|
||||
# Note: Order is important here.
|
||||
|
||||
# Needed for calling save() with reverter functionality that resides in
|
||||
@@ -45,7 +56,7 @@ class ApacheParser:
|
||||
self.configurator = configurator
|
||||
|
||||
# Initialize augeas
|
||||
self.aug = init_augeas()
|
||||
self.aug: Augeas = init_augeas()
|
||||
|
||||
if not self.check_aug_version():
|
||||
raise errors.NotSupportedError(
|
||||
@@ -58,8 +69,8 @@ class ApacheParser:
|
||||
self.variables: Dict[str, str] = {}
|
||||
|
||||
# Find configuration root and make sure augeas can parse it.
|
||||
self.root = os.path.abspath(root)
|
||||
self.loc = {"root": self._find_config_root()}
|
||||
self.root: str = os.path.abspath(root)
|
||||
self.loc: Dict[str, str] = {"root": self._find_config_root()}
|
||||
self.parse_file(self.loc["root"])
|
||||
|
||||
if version >= (2, 4):
|
||||
@@ -88,7 +99,7 @@ class ApacheParser:
|
||||
if self.find_dir("Define", exclude=False):
|
||||
raise errors.PluginError("Error parsing runtime variables")
|
||||
|
||||
def check_parsing_errors(self, lens):
|
||||
def check_parsing_errors(self, lens: str) -> None:
|
||||
"""Verify Augeas can parse all of the lens files.
|
||||
|
||||
:param str lens: lens to check for errors
|
||||
@@ -114,7 +125,7 @@ class ApacheParser:
|
||||
self.aug.get(path + "/message")))
|
||||
raise errors.PluginError(msg)
|
||||
|
||||
def check_aug_version(self):
|
||||
def check_aug_version(self) -> bool:
|
||||
""" Checks that we have recent enough version of libaugeas.
|
||||
If augeas version is recent enough, it will support case insensitive
|
||||
regexp matching"""
|
||||
@@ -129,7 +140,7 @@ class ApacheParser:
|
||||
self.aug.remove("/test/path")
|
||||
return matches
|
||||
|
||||
def unsaved_files(self):
|
||||
def unsaved_files(self) -> Set[str]:
|
||||
"""Lists files that have modified Augeas DOM but the changes have not
|
||||
been written to the filesystem yet, used by `self.save()` and
|
||||
ApacheConfigurator to check the file state.
|
||||
@@ -168,7 +179,7 @@ class ApacheParser:
|
||||
save_files.add(self.aug.get(path)[6:])
|
||||
return save_files
|
||||
|
||||
def ensure_augeas_state(self):
|
||||
def ensure_augeas_state(self) -> None:
|
||||
"""Makes sure that all Augeas dom changes are written to files to avoid
|
||||
loss of configuration directives when doing additional augeas parsing,
|
||||
causing a possible augeas.load() resulting dom reset
|
||||
@@ -178,7 +189,7 @@ class ApacheParser:
|
||||
self.configurator.save_notes += "(autosave)"
|
||||
self.configurator.save()
|
||||
|
||||
def save(self, save_files):
|
||||
def save(self, save_files: Iterable[str]) -> None:
|
||||
"""Saves all changes to the configuration files.
|
||||
|
||||
save() is called from ApacheConfigurator to handle the parser specific
|
||||
@@ -197,7 +208,7 @@ class ApacheParser:
|
||||
self.aug.remove("/files/"+sf)
|
||||
self.aug.load()
|
||||
|
||||
def _log_save_errors(self, ex_errs):
|
||||
def _log_save_errors(self, ex_errs: List[str]) -> None:
|
||||
"""Log errors due to bad Augeas save.
|
||||
|
||||
:param list ex_errs: Existing errors before save
|
||||
@@ -211,7 +222,7 @@ class ApacheParser:
|
||||
# Only new errors caused by recent save
|
||||
if err not in ex_errs), self.configurator.save_notes)
|
||||
|
||||
def add_include(self, main_config, inc_path):
|
||||
def add_include(self, main_config: str, inc_path: str) -> None:
|
||||
"""Add Include for a new configuration file if one does not exist
|
||||
|
||||
:param str main_config: file path to main Apache config file
|
||||
@@ -230,21 +241,21 @@ class ApacheParser:
|
||||
new_file = os.path.basename(inc_path)
|
||||
self.existing_paths.setdefault(new_dir, []).append(new_file)
|
||||
|
||||
def add_mod(self, mod_name):
|
||||
def add_mod(self, mod_name: str) -> None:
|
||||
"""Shortcut for updating parser modules."""
|
||||
if mod_name + "_module" not in self.modules:
|
||||
self.modules[mod_name + "_module"] = None
|
||||
if "mod_" + mod_name + ".c" not in self.modules:
|
||||
self.modules["mod_" + mod_name + ".c"] = None
|
||||
|
||||
def reset_modules(self):
|
||||
def reset_modules(self) -> None:
|
||||
"""Reset the loaded modules list. This is called from cleanup to clear
|
||||
temporarily loaded modules."""
|
||||
self.modules = {}
|
||||
self.update_modules()
|
||||
self.parse_modules()
|
||||
|
||||
def parse_modules(self):
|
||||
def parse_modules(self) -> None:
|
||||
"""Iterates on the configuration until no new modules are loaded.
|
||||
|
||||
..todo:: This should be attempted to be done with a binary to avoid
|
||||
@@ -272,19 +283,18 @@ class ApacheParser:
|
||||
match_name[6:])
|
||||
self.modules.update(mods)
|
||||
|
||||
def update_runtime_variables(self):
|
||||
def update_runtime_variables(self) -> None:
|
||||
"""Update Includes, Defines and Includes from httpd config dump data"""
|
||||
|
||||
self.update_defines()
|
||||
self.update_includes()
|
||||
self.update_modules()
|
||||
|
||||
def update_defines(self):
|
||||
def update_defines(self) -> None:
|
||||
"""Updates the dictionary of known variables in the configuration"""
|
||||
|
||||
self.variables = apache_util.parse_defines(self.configurator.options.ctl)
|
||||
|
||||
def update_includes(self):
|
||||
def update_includes(self) -> None:
|
||||
"""Get includes from httpd process, and add them to DOM if needed"""
|
||||
|
||||
# Find_dir iterates over configuration for Include and IncludeOptional
|
||||
@@ -298,28 +308,28 @@ class ApacheParser:
|
||||
if not self.parsed_in_current(i):
|
||||
self.parse_file(i)
|
||||
|
||||
def update_modules(self):
|
||||
def update_modules(self) -> None:
|
||||
"""Get loaded modules from httpd process, and add them to DOM"""
|
||||
|
||||
matches = apache_util.parse_modules(self.configurator.options.ctl)
|
||||
for mod in matches:
|
||||
self.add_mod(mod.strip())
|
||||
|
||||
def filter_args_num(self, matches, args):
|
||||
def filter_args_num(self, matches: str, args: int) -> List[str]:
|
||||
"""Filter out directives with specific number of arguments.
|
||||
|
||||
This function makes the assumption that all related arguments are given
|
||||
in order. Thus /files/apache/directive[5]/arg[2] must come immediately
|
||||
after /files/apache/directive[5]/arg[1]. Runs in 1 linear pass.
|
||||
|
||||
:param string matches: Matches of all directives with arg nodes
|
||||
:param str matches: Matches of all directives with arg nodes
|
||||
:param int args: Number of args you would like to filter
|
||||
|
||||
:returns: List of directives that contain # of arguments.
|
||||
(arg is stripped off)
|
||||
|
||||
"""
|
||||
filtered = []
|
||||
filtered: List[str] = []
|
||||
if args == 1:
|
||||
for i, match in enumerate(matches):
|
||||
if match.endswith("/arg"):
|
||||
@@ -336,7 +346,7 @@ class ApacheParser:
|
||||
|
||||
return filtered
|
||||
|
||||
def add_dir_to_ifmodssl(self, aug_conf_path, directive, args):
|
||||
def add_dir_to_ifmodssl(self, aug_conf_path: str, directive: str, args: List[str]) -> None:
|
||||
"""Adds directive and value to IfMod ssl block.
|
||||
|
||||
Adds given directive and value along configuration path within
|
||||
@@ -362,7 +372,7 @@ class ApacheParser:
|
||||
for i, arg in enumerate(args):
|
||||
self.aug.set("%s/arg[%d]" % (nvh_path, i + 1), arg)
|
||||
|
||||
def get_ifmod(self, aug_conf_path, mod, beginning=False):
|
||||
def get_ifmod(self, aug_conf_path: str, mod: str, beginning: bool = False) -> str:
|
||||
"""Returns the path to <IfMod mod> and creates one if it doesn't exist.
|
||||
|
||||
:param str aug_conf_path: Augeas configuration path
|
||||
@@ -384,7 +394,7 @@ class ApacheParser:
|
||||
# Strip off "arg" at end of first ifmod path
|
||||
return if_mods[0].rpartition("arg")[0]
|
||||
|
||||
def create_ifmod(self, aug_conf_path, mod, beginning=False):
|
||||
def create_ifmod(self, aug_conf_path: str, mod: str, beginning: bool = False) -> str:
|
||||
"""Creates a new <IfMod mod> and returns its path.
|
||||
|
||||
:param str aug_conf_path: Augeas configuration path
|
||||
@@ -411,7 +421,9 @@ class ApacheParser:
|
||||
self.aug.set(c_path_arg, mod)
|
||||
return retpath
|
||||
|
||||
def add_dir(self, aug_conf_path, directive, args):
|
||||
def add_dir(
|
||||
self, aug_conf_path: str, directive: Optional[str], args: Union[List[str], str]
|
||||
) -> None:
|
||||
"""Appends directive to the end fo the file given by aug_conf_path.
|
||||
|
||||
.. note:: Not added to AugeasConfigurator because it may depend
|
||||
@@ -431,7 +443,8 @@ class ApacheParser:
|
||||
else:
|
||||
self.aug.set(aug_conf_path + "/directive[last()]/arg", args)
|
||||
|
||||
def add_dir_beginning(self, aug_conf_path, dirname, args):
|
||||
def add_dir_beginning(self, aug_conf_path: str, dirname: str,
|
||||
args: Union[List[str], str]) -> None:
|
||||
"""Adds the directive to the beginning of defined aug_conf_path.
|
||||
|
||||
:param str aug_conf_path: Augeas configuration path to add directive
|
||||
@@ -452,7 +465,7 @@ class ApacheParser:
|
||||
else:
|
||||
self.aug.set(first_dir + "/arg", args)
|
||||
|
||||
def add_comment(self, aug_conf_path, comment):
|
||||
def add_comment(self, aug_conf_path: str, comment: str) -> None:
|
||||
"""Adds the comment to the augeas path
|
||||
|
||||
:param str aug_conf_path: Augeas configuration path to add directive
|
||||
@@ -461,7 +474,7 @@ class ApacheParser:
|
||||
"""
|
||||
self.aug.set(aug_conf_path + "/#comment[last() + 1]", comment)
|
||||
|
||||
def find_comments(self, arg, start=None):
|
||||
def find_comments(self, arg: str, start: Optional[str] = None) -> List[str]:
|
||||
"""Finds a comment with specified content from the provided DOM path
|
||||
|
||||
:param str arg: Comment content to search
|
||||
@@ -483,7 +496,8 @@ class ApacheParser:
|
||||
results.append(comment)
|
||||
return results
|
||||
|
||||
def find_dir(self, directive, arg=None, start=None, exclude=True):
|
||||
def find_dir(self, directive: str, arg: Optional[str] = None,
|
||||
start: Optional[str] = None, exclude: bool = True) -> List[str]:
|
||||
"""Finds directive in the configuration.
|
||||
|
||||
Recursively searches through config files to find directives
|
||||
@@ -511,6 +525,8 @@ class ApacheParser:
|
||||
:param bool exclude: Whether or not to exclude directives based on
|
||||
variables and enabled modules
|
||||
|
||||
:rtype list
|
||||
|
||||
"""
|
||||
# Cannot place member variable in the definition of the function so...
|
||||
if not start:
|
||||
@@ -559,7 +575,7 @@ class ApacheParser:
|
||||
|
||||
return ordered_matches
|
||||
|
||||
def get_all_args(self, match):
|
||||
def get_all_args(self, match: str) -> List[Optional[str]]:
|
||||
"""
|
||||
Tries to fetch all arguments for a directive. See get_arg.
|
||||
|
||||
@@ -569,11 +585,11 @@ class ApacheParser:
|
||||
"""
|
||||
|
||||
if match[-1] != "/":
|
||||
match = match+"/"
|
||||
match = match + "/"
|
||||
allargs = self.aug.match(match + '*')
|
||||
return [self.get_arg(arg) for arg in allargs]
|
||||
|
||||
def get_arg(self, match):
|
||||
def get_arg(self, match: Optional[str]) -> Optional[str]:
|
||||
"""Uses augeas.get to get argument value and interprets result.
|
||||
|
||||
This also converts all variables and parameters appropriately.
|
||||
@@ -588,6 +604,7 @@ class ApacheParser:
|
||||
# e.g. strip now, not later
|
||||
if not value:
|
||||
return None
|
||||
|
||||
value = value.strip("'\"")
|
||||
|
||||
variables = ApacheParser.arg_var_interpreter.findall(value)
|
||||
@@ -601,13 +618,13 @@ class ApacheParser:
|
||||
|
||||
return value
|
||||
|
||||
def get_root_augpath(self):
|
||||
def get_root_augpath(self) -> str:
|
||||
"""
|
||||
Returns the Augeas path of root configuration.
|
||||
"""
|
||||
return get_aug_path(self.loc["root"])
|
||||
|
||||
def exclude_dirs(self, matches):
|
||||
def exclude_dirs(self, matches: Iterable[str]) -> List[str]:
|
||||
"""Exclude directives that are not loaded into the configuration."""
|
||||
filters = [("ifmodule", self.modules.keys()), ("ifdefine", self.variables)]
|
||||
|
||||
@@ -621,7 +638,7 @@ class ApacheParser:
|
||||
valid_matches.append(match)
|
||||
return valid_matches
|
||||
|
||||
def _pass_filter(self, match, filter_):
|
||||
def _pass_filter(self, match: str, filter_: Tuple[str, Collection[str]]) -> bool:
|
||||
"""Determine if directive passes a filter.
|
||||
|
||||
:param str match: Augeas path
|
||||
@@ -650,7 +667,7 @@ class ApacheParser:
|
||||
|
||||
return True
|
||||
|
||||
def standard_path_from_server_root(self, arg):
|
||||
def standard_path_from_server_root(self, arg: str) -> str:
|
||||
"""Ensure paths are consistent and absolute
|
||||
|
||||
:param str arg: Argument of directive
|
||||
@@ -669,7 +686,7 @@ class ApacheParser:
|
||||
arg = os.path.normpath(arg)
|
||||
return arg
|
||||
|
||||
def _get_include_path(self, arg):
|
||||
def _get_include_path(self, arg: Optional[str]) -> Optional[str]:
|
||||
"""Converts an Apache Include directive into Augeas path.
|
||||
|
||||
Converts an Apache Include directive argument into an Augeas
|
||||
@@ -689,6 +706,8 @@ class ApacheParser:
|
||||
# if matchObj.group() != arg:
|
||||
# logger.error("Error: Invalid regexp characters in %s", arg)
|
||||
# return []
|
||||
if arg is None:
|
||||
return None # pragma: no cover
|
||||
arg = self.standard_path_from_server_root(arg)
|
||||
|
||||
# Attempts to add a transform to the file if one does not already exist
|
||||
@@ -713,7 +732,7 @@ class ApacheParser:
|
||||
|
||||
return get_aug_path(arg)
|
||||
|
||||
def fnmatch_to_re(self, clean_fn_match):
|
||||
def fnmatch_to_re(self, clean_fn_match: str) -> str:
|
||||
"""Method converts Apache's basic fnmatch to regular expression.
|
||||
|
||||
Assumption - Configs are assumed to be well-formed and only writable by
|
||||
@@ -730,7 +749,7 @@ class ApacheParser:
|
||||
# Since Python 3.6, it returns a different pattern like (?s:.*\.load)\Z
|
||||
return fnmatch.translate(clean_fn_match)[4:-3] # pragma: no cover
|
||||
|
||||
def parse_file(self, filepath):
|
||||
def parse_file(self, filepath: str) -> None:
|
||||
"""Parse file with Augeas
|
||||
|
||||
Checks to see if file_path is parsed by Augeas
|
||||
@@ -757,7 +776,7 @@ class ApacheParser:
|
||||
self._add_httpd_transform(filepath)
|
||||
self.aug.load()
|
||||
|
||||
def parsed_in_current(self, filep):
|
||||
def parsed_in_current(self, filep: Optional[str]) -> bool:
|
||||
"""Checks if the file path is parsed by current Augeas parser config
|
||||
ie. returns True if the file is found on a path that's found in live
|
||||
Augeas configuration.
|
||||
@@ -767,9 +786,11 @@ class ApacheParser:
|
||||
:returns: True if file is parsed in existing configuration tree
|
||||
:rtype: bool
|
||||
"""
|
||||
if not filep:
|
||||
return False # pragma: no cover
|
||||
return self._parsed_by_parser_paths(filep, self.parser_paths)
|
||||
|
||||
def parsed_in_original(self, filep):
|
||||
def parsed_in_original(self, filep: Optional[str]) -> bool:
|
||||
"""Checks if the file path is parsed by existing Apache config.
|
||||
ie. returns True if the file is found on a path that matches Include or
|
||||
IncludeOptional statement in the Apache configuration.
|
||||
@@ -779,9 +800,11 @@ class ApacheParser:
|
||||
:returns: True if file is parsed in existing configuration tree
|
||||
:rtype: bool
|
||||
"""
|
||||
if not filep:
|
||||
return False # pragma: no cover
|
||||
return self._parsed_by_parser_paths(filep, self.existing_paths)
|
||||
|
||||
def _parsed_by_parser_paths(self, filep, paths):
|
||||
def _parsed_by_parser_paths(self, filep: str, paths: Mapping[str, List[str]]) -> bool:
|
||||
"""Helper function that searches through provided paths and returns
|
||||
True if file path is found in the set"""
|
||||
for directory in paths:
|
||||
@@ -790,7 +813,7 @@ class ApacheParser:
|
||||
return True
|
||||
return False
|
||||
|
||||
def _check_path_actions(self, filepath):
|
||||
def _check_path_actions(self, filepath: str) -> Tuple[bool, bool]:
|
||||
"""Determine actions to take with a new augeas path
|
||||
|
||||
This helper function will return a tuple that defines
|
||||
@@ -815,7 +838,7 @@ class ApacheParser:
|
||||
remove_old = False
|
||||
return use_new, remove_old
|
||||
|
||||
def _remove_httpd_transform(self, filepath):
|
||||
def _remove_httpd_transform(self, filepath: str) -> None:
|
||||
"""Remove path from Augeas transform
|
||||
|
||||
:param str filepath: filepath to remove
|
||||
@@ -830,7 +853,7 @@ class ApacheParser:
|
||||
self.aug.remove(remove_inc[0])
|
||||
self.parser_paths.pop(remove_dirname)
|
||||
|
||||
def _add_httpd_transform(self, incl):
|
||||
def _add_httpd_transform(self, incl: str) -> None:
|
||||
"""Add a transform to Augeas.
|
||||
|
||||
This function will correctly add a transform to augeas
|
||||
@@ -840,7 +863,7 @@ class ApacheParser:
|
||||
:param str incl: filepath to include for transform
|
||||
|
||||
"""
|
||||
last_include = self.aug.match("/augeas/load/Httpd/incl [last()]")
|
||||
last_include: str = self.aug.match("/augeas/load/Httpd/incl [last()]")
|
||||
if last_include:
|
||||
# Insert a new node immediately after the last incl
|
||||
self.aug.insert(last_include[0], "incl", False)
|
||||
@@ -858,7 +881,7 @@ class ApacheParser:
|
||||
self.parser_paths[os.path.dirname(incl)] = [
|
||||
os.path.basename(incl)]
|
||||
|
||||
def standardize_excl(self):
|
||||
def standardize_excl(self) -> None:
|
||||
"""Standardize the excl arguments for the Httpd lens in Augeas.
|
||||
|
||||
Note: Hack!
|
||||
@@ -890,16 +913,16 @@ class ApacheParser:
|
||||
|
||||
self.aug.load()
|
||||
|
||||
def _set_locations(self):
|
||||
def _set_locations(self) -> Dict[str, str]:
|
||||
"""Set default location for directives.
|
||||
|
||||
Locations are given as file_paths
|
||||
.. todo:: Make sure that files are included
|
||||
|
||||
"""
|
||||
default = self.loc["root"]
|
||||
default: str = self.loc["root"]
|
||||
|
||||
temp = os.path.join(self.root, "ports.conf")
|
||||
temp: str = os.path.join(self.root, "ports.conf")
|
||||
if os.path.isfile(temp):
|
||||
listen = temp
|
||||
name = temp
|
||||
@@ -909,7 +932,7 @@ class ApacheParser:
|
||||
|
||||
return {"default": default, "listen": listen, "name": name}
|
||||
|
||||
def _find_config_root(self):
|
||||
def _find_config_root(self) -> str:
|
||||
"""Find the Apache Configuration Root file."""
|
||||
location = ["apache2.conf", "httpd.conf", "conf/httpd.conf"]
|
||||
for name in location:
|
||||
@@ -918,7 +941,7 @@ class ApacheParser:
|
||||
raise errors.NoInstallationError("Could not find configuration root")
|
||||
|
||||
|
||||
def case_i(string):
|
||||
def case_i(string: str) -> str:
|
||||
"""Returns case insensitive regex.
|
||||
|
||||
Returns a sloppy, but necessary version of a case insensitive regex.
|
||||
@@ -934,7 +957,7 @@ def case_i(string):
|
||||
if c.isalpha() else c for c in re.escape(string))
|
||||
|
||||
|
||||
def get_aug_path(file_path):
|
||||
def get_aug_path(file_path: str) -> str:
|
||||
"""Return augeas path for full filepath.
|
||||
|
||||
:param str file_path: Full filepath
|
||||
|
||||
@@ -1,7 +1,12 @@
|
||||
"""ParserNode utils"""
|
||||
from typing import Dict
|
||||
from typing import Any
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from typing import Tuple
|
||||
|
||||
|
||||
def validate_kwargs(kwargs, required_names):
|
||||
def validate_kwargs(kwargs: Dict[str, Any], required_names: List[str]) -> Dict[str, Any]:
|
||||
"""
|
||||
Ensures that the kwargs dict has all the expected values. This function modifies
|
||||
the kwargs dictionary, and hence the returned dictionary should be used instead
|
||||
@@ -11,7 +16,7 @@ def validate_kwargs(kwargs, required_names):
|
||||
:param list required_names: List of required parameter names.
|
||||
"""
|
||||
|
||||
validated_kwargs = {}
|
||||
validated_kwargs: Dict[str, Any] = {}
|
||||
for name in required_names:
|
||||
try:
|
||||
validated_kwargs[name] = kwargs.pop(name)
|
||||
@@ -25,7 +30,7 @@ def validate_kwargs(kwargs, required_names):
|
||||
return validated_kwargs
|
||||
|
||||
|
||||
def parsernode_kwargs(kwargs):
|
||||
def parsernode_kwargs(kwargs: Dict[str, Any]) -> Tuple[Any, ...]:
|
||||
"""
|
||||
Validates keyword arguments for ParserNode. This function modifies the kwargs
|
||||
dictionary, and hence the returned dictionary should be used instead in the
|
||||
@@ -55,7 +60,7 @@ def parsernode_kwargs(kwargs):
|
||||
return kwargs["ancestor"], kwargs["dirty"], kwargs["filepath"], kwargs["metadata"]
|
||||
|
||||
|
||||
def commentnode_kwargs(kwargs):
|
||||
def commentnode_kwargs(kwargs: Dict[str, Any]) -> Tuple[Optional[str], Dict[str, str]]:
|
||||
"""
|
||||
Validates keyword arguments for CommentNode and sets the default values for
|
||||
optional kwargs. This function modifies the kwargs dictionary, and hence the
|
||||
@@ -90,7 +95,7 @@ def commentnode_kwargs(kwargs):
|
||||
return comment, kwargs
|
||||
|
||||
|
||||
def directivenode_kwargs(kwargs):
|
||||
def directivenode_kwargs(kwargs: Dict[str, Any]) -> Tuple[Any, Any, Any, Dict]:
|
||||
"""
|
||||
Validates keyword arguments for DirectiveNode and BlockNode and sets the
|
||||
default values for optional kwargs. This function modifies the kwargs
|
||||
|
||||
@@ -345,8 +345,8 @@ class ParserInitTest(util.ApacheTest):
|
||||
self.config.config_test = mock.Mock()
|
||||
self.assertRaises(
|
||||
errors.NoInstallationError, ApacheParser,
|
||||
os.path.relpath(self.config_path), "/dummy/vhostpath",
|
||||
version=(2, 4, 22), configurator=self.config)
|
||||
os.path.relpath(self.config_path), self.config,
|
||||
"/dummy/vhostpath", version=(2, 4, 22))
|
||||
|
||||
def test_init_old_aug(self):
|
||||
from certbot_apache._internal.parser import ApacheParser
|
||||
@@ -354,8 +354,8 @@ class ParserInitTest(util.ApacheTest):
|
||||
mock_c.return_value = False
|
||||
self.assertRaises(
|
||||
errors.NotSupportedError,
|
||||
ApacheParser, os.path.relpath(self.config_path),
|
||||
"/dummy/vhostpath", version=(2, 4, 22), configurator=self.config)
|
||||
ApacheParser, os.path.relpath(self.config_path), self.config,
|
||||
"/dummy/vhostpath", version=(2, 4, 22))
|
||||
|
||||
@mock.patch("certbot_apache._internal.apache_util._get_runtime_cfg")
|
||||
def test_unparseable(self, mock_cfg):
|
||||
@@ -363,8 +363,8 @@ class ParserInitTest(util.ApacheTest):
|
||||
mock_cfg.return_value = ('Define: TEST')
|
||||
self.assertRaises(
|
||||
errors.PluginError,
|
||||
ApacheParser, os.path.relpath(self.config_path),
|
||||
"/dummy/vhostpath", version=(2, 2, 22), configurator=self.config)
|
||||
ApacheParser, os.path.relpath(self.config_path), self.config,
|
||||
"/dummy/vhostpath", version=(2, 2, 22))
|
||||
|
||||
def test_root_normalized(self):
|
||||
from certbot_apache._internal.parser import ApacheParser
|
||||
@@ -375,7 +375,7 @@ class ParserInitTest(util.ApacheTest):
|
||||
self.temp_dir,
|
||||
"debian_apache_2_4/////multiple_vhosts/../multiple_vhosts/apache2")
|
||||
|
||||
parser = ApacheParser(path, "/dummy/vhostpath", configurator=self.config)
|
||||
parser = ApacheParser(path, self.config, "/dummy/vhostpath")
|
||||
|
||||
self.assertEqual(parser.root, self.config_path)
|
||||
|
||||
@@ -384,8 +384,7 @@ class ParserInitTest(util.ApacheTest):
|
||||
with mock.patch("certbot_apache._internal.parser.ApacheParser."
|
||||
"update_runtime_variables"):
|
||||
parser = ApacheParser(
|
||||
os.path.relpath(self.config_path),
|
||||
"/dummy/vhostpath", configurator=self.config)
|
||||
os.path.relpath(self.config_path), self.config, "/dummy/vhostpath")
|
||||
|
||||
self.assertEqual(parser.root, self.config_path)
|
||||
|
||||
@@ -394,8 +393,7 @@ class ParserInitTest(util.ApacheTest):
|
||||
with mock.patch("certbot_apache._internal.parser.ApacheParser."
|
||||
"update_runtime_variables"):
|
||||
parser = ApacheParser(
|
||||
self.config_path + os.path.sep,
|
||||
"/dummy/vhostpath", configurator=self.config)
|
||||
self.config_path + os.path.sep, self.config, "/dummy/vhostpath")
|
||||
self.assertEqual(parser.root, self.config_path)
|
||||
|
||||
|
||||
|
||||
@@ -73,7 +73,7 @@ class ParserTest(ApacheTest):
|
||||
with mock.patch("certbot_apache._internal.parser.ApacheParser."
|
||||
"update_runtime_variables"):
|
||||
self.parser = ApacheParser(
|
||||
self.config_path, self.vhost_path, configurator=self.config)
|
||||
self.config_path, self.config, self.vhost_path)
|
||||
|
||||
|
||||
def get_apache_configurator(
|
||||
|
||||
@@ -8,11 +8,11 @@ import tempfile
|
||||
from typing import Iterable
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from typing import Union
|
||||
from typing import overload
|
||||
from typing import Set
|
||||
from typing import Tuple
|
||||
from typing import Type
|
||||
from typing import Union
|
||||
|
||||
from certbot_compatibility_test import errors
|
||||
from certbot_compatibility_test import interfaces
|
||||
|
||||
@@ -10,7 +10,6 @@ from typing import IO
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from typing import Tuple
|
||||
from typing import Union
|
||||
import warnings
|
||||
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
@@ -573,6 +572,7 @@ class Client:
|
||||
:param list domains: list of domains to install the certificate
|
||||
:param str privkey_path: path to certificate private key
|
||||
:param str cert_path: certificate file path (optional)
|
||||
:param str fullchain_path: path to the full chain of the certificate
|
||||
:param str chain_path: chain file path
|
||||
|
||||
"""
|
||||
@@ -643,13 +643,13 @@ class Client:
|
||||
"Option %s is not supported by the selected installer. "
|
||||
"Skipping enhancement.", config_name)
|
||||
|
||||
msg = ("We were unable to restart web server")
|
||||
msg = "We were unable to restart web server"
|
||||
if enhanced:
|
||||
with error_handler.ErrorHandler(self._rollback_and_restart, msg):
|
||||
self.installer.restart()
|
||||
|
||||
def apply_enhancement(self, domains: List[str], enhancement: str,
|
||||
options: Optional[Union[List[str], str]] = None) -> None:
|
||||
options: Optional[str] = None) -> None:
|
||||
"""Applies an enhancement on all domains.
|
||||
|
||||
:param list domains: list of ssl_vhosts (as strings)
|
||||
|
||||
@@ -5,13 +5,13 @@ from argparse import ArgumentParser
|
||||
import sys
|
||||
from types import ModuleType
|
||||
from typing import Any
|
||||
from typing import Union
|
||||
from typing import cast
|
||||
from typing import Iterable
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from typing import Type
|
||||
from typing import TYPE_CHECKING
|
||||
from typing import Union
|
||||
import warnings
|
||||
|
||||
import zope.interface
|
||||
|
||||
@@ -12,6 +12,8 @@ from typing import Iterable
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from typing import Set
|
||||
from typing import Type
|
||||
from typing import TypeVar
|
||||
from typing import Tuple
|
||||
|
||||
import pkg_resources
|
||||
@@ -244,6 +246,9 @@ class Configurator(Installer, interfaces.Authenticator, metaclass=ABCMeta):
|
||||
"""
|
||||
|
||||
|
||||
GenericAddr = TypeVar("GenericAddr", bound="Addr")
|
||||
|
||||
|
||||
class Addr:
|
||||
r"""Represents an virtual host address.
|
||||
|
||||
@@ -256,7 +261,7 @@ class Addr:
|
||||
self.ipv6 = ipv6
|
||||
|
||||
@classmethod
|
||||
def fromstring(cls, str_addr: str) -> Optional['Addr']:
|
||||
def fromstring(cls: Type[GenericAddr], str_addr: str) -> Optional[GenericAddr]:
|
||||
"""Initialize Addr from string."""
|
||||
if str_addr.startswith('['):
|
||||
# ipv6 addresses starts with [
|
||||
@@ -301,7 +306,7 @@ class Addr:
|
||||
"""Return port."""
|
||||
return self.tup[1]
|
||||
|
||||
def get_addr_obj(self, port: str) -> 'Addr':
|
||||
def get_addr_obj(self: GenericAddr, port: str) -> GenericAddr:
|
||||
"""Return new address object with same addr and new port."""
|
||||
return self.__class__((self.tup[0], port), self.ipv6)
|
||||
|
||||
|
||||
@@ -9,12 +9,12 @@ import sys
|
||||
import tempfile
|
||||
from typing import Any
|
||||
from typing import Callable
|
||||
from typing import Union
|
||||
from typing import cast
|
||||
from typing import IO
|
||||
from typing import Iterable
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from typing import Union
|
||||
import unittest
|
||||
import warnings
|
||||
|
||||
|
||||
Reference in New Issue
Block a user