Compare commits

...

9 Commits

Author SHA1 Message Date
Adrien Ferrand
57a8401369 Realign coverage 2022-01-19 12:16:31 +01:00
Adrien Ferrand
f0f0b4db08 Final typing fixes 2022-01-19 11:47:34 +01:00
Adrien Ferrand
8840618c2d Merge remote-tracking branch 'upstream/master' into apache_typing 2022-01-19 11:17:26 +01:00
Adrien Ferrand
d1097d476f Merge branch 'master' into pr/9071 2022-01-19 11:12:47 +01:00
Adrien Ferrand
2ef0a702c9 Fixing remaining typing issues 2022-01-19 10:59:56 +01:00
Mads Jensen
f62eabc94e Fix a bit more 2022-01-05 17:49:49 +01:00
Mads Jensen
b1c5362929 Fixing some errors. 2022-01-05 17:49:49 +01:00
Mads Jensen
2c496138bc Fix some things 2022-01-05 17:49:17 +01:00
Mads Jensen
f962a3c613 Add typing to certbot.apache 2022-01-05 17:49:17 +01:00
25 changed files with 730 additions and 559 deletions

View File

@@ -4,6 +4,13 @@ import fnmatch
import logging
import re
import subprocess
from typing import Any
from typing import Dict
from typing import Iterable
from typing import List
from typing import Optional
from typing import Sequence
from typing import Tuple
import pkg_resources
@@ -14,7 +21,7 @@ from certbot.compat import os
logger = logging.getLogger(__name__)
def get_mod_deps(mod_name):
def get_mod_deps(mod_name: str) -> Any:
"""Get known module dependencies.
.. note:: This does not need to be accurate in order for the client to
@@ -33,7 +40,7 @@ def get_mod_deps(mod_name):
return deps.get(mod_name, [])
def get_file_path(vhost_path):
def get_file_path(vhost_path: str) -> Optional[str]:
"""Get file path from augeas_vhost_path.
Takes in Augeas path and returns the file name
@@ -50,7 +57,7 @@ def get_file_path(vhost_path):
return _split_aug_path(vhost_path)[0]
def get_internal_aug_path(vhost_path):
def get_internal_aug_path(vhost_path: str) -> str:
"""Get the Augeas path for a vhost with the file path removed.
:param str vhost_path: Augeas virtual host path
@@ -62,7 +69,7 @@ def get_internal_aug_path(vhost_path):
return _split_aug_path(vhost_path)[1]
def _split_aug_path(vhost_path):
def _split_aug_path(vhost_path: str) -> Tuple[str, str]:
"""Splits an Augeas path into a file path and an internal path.
After removing "/files", this function splits vhost_path into the
@@ -76,7 +83,7 @@ def _split_aug_path(vhost_path):
"""
# Strip off /files
file_path = vhost_path[6:]
internal_path = []
internal_path: List[str] = []
# Remove components from the end of file_path until it becomes valid
while not os.path.exists(file_path):
@@ -86,7 +93,7 @@ def _split_aug_path(vhost_path):
return file_path, "/".join(reversed(internal_path))
def parse_define_file(filepath, varname):
def parse_define_file(filepath: str, varname: str) -> Dict[str, str]:
""" Parses Defines from a variable in configuration file
:param str filepath: Path of file to parse
@@ -96,7 +103,7 @@ def parse_define_file(filepath, varname):
:rtype: `dict`
"""
return_vars = {}
return_vars: Dict[str, str] = {}
# Get list of words in the variable
a_opts = util.get_var_from_file(varname, filepath).split()
for i, v in enumerate(a_opts):
@@ -111,19 +118,19 @@ def parse_define_file(filepath, varname):
return return_vars
def unique_id():
def unique_id() -> str:
""" Returns an unique id to be used as a VirtualHost identifier"""
return binascii.hexlify(os.urandom(16)).decode("utf-8")
def included_in_paths(filepath, paths):
def included_in_paths(filepath: str, paths: Iterable[str]) -> bool:
"""
Returns true if the filepath is included in the list of paths
that may contain full paths or wildcard paths that need to be
expanded.
:param str filepath: Filepath to check
:params list paths: List of paths to check against
:param list paths: List of paths to check against
:returns: True if included
:rtype: bool
@@ -132,7 +139,7 @@ def included_in_paths(filepath, paths):
return any(fnmatch.fnmatch(filepath, path) for path in paths)
def parse_defines(apachectl):
def parse_defines(apachectl: str) -> Dict[str, str]:
"""
Gets Defines from httpd process and returns a dictionary of
the defined variables.
@@ -143,7 +150,7 @@ def parse_defines(apachectl):
:rtype: dict
"""
variables = {}
variables: Dict[str, str] = {}
define_cmd = [apachectl, "-t", "-D",
"DUMP_RUN_CFG"]
matches = parse_from_subprocess(define_cmd, r"Define: ([^ \n]*)")
@@ -161,7 +168,7 @@ def parse_defines(apachectl):
return variables
def parse_includes(apachectl):
def parse_includes(apachectl: str) -> List[str]:
"""
Gets Include directives from httpd process and returns a list of
their values.
@@ -172,11 +179,11 @@ def parse_includes(apachectl):
:rtype: list of str
"""
inc_cmd = [apachectl, "-t", "-D", "DUMP_INCLUDES"]
inc_cmd: List[str] = [apachectl, "-t", "-D", "DUMP_INCLUDES"]
return parse_from_subprocess(inc_cmd, r"\(.*\) (.*)")
def parse_modules(apachectl):
def parse_modules(apachectl: str) -> List[str]:
"""
Get loaded modules from httpd process, and return the list
of loaded module names.
@@ -191,7 +198,7 @@ def parse_modules(apachectl):
return parse_from_subprocess(mod_cmd, r"(.*)_module")
def parse_from_subprocess(command, regexp):
def parse_from_subprocess(command: List[str], regexp: str) -> List[str]:
"""Get values from stdout of subprocess command
:param list command: Command to run
@@ -205,7 +212,7 @@ def parse_from_subprocess(command, regexp):
return re.compile(regexp).findall(stdout)
def _get_runtime_cfg(command):
def _get_runtime_cfg(command: Sequence[str]) -> str:
"""
Get runtime configuration info.
@@ -241,7 +248,7 @@ def _get_runtime_cfg(command):
return stdout
def find_ssl_apache_conf(prefix):
def find_ssl_apache_conf(prefix: str) -> str:
"""
Find a TLS Apache config file in the dedicated storage.
:param str prefix: prefix of the TLS Apache config file to find

View File

@@ -1,4 +1,7 @@
""" apacheconfig implementation of the ParserNode interfaces """
from typing import Any
from typing import List
from typing import Optional
from typing import Tuple
from certbot_apache._internal import assertions
@@ -13,20 +16,21 @@ class ApacheParserNode(interfaces.ParserNode):
by parsing the equivalent configuration text using the apacheconfig library.
"""
def __init__(self, **kwargs):
ancestor, dirty, filepath, metadata = util.parsernode_kwargs(
kwargs) # pylint: disable=unused-variable
def __init__(self, **kwargs: Any):
# pylint: disable=unused-variable
ancestor, dirty, filepath, metadata = util.parsernode_kwargs(kwargs)
super().__init__(**kwargs)
self.ancestor = ancestor
self.filepath = filepath
self.dirty = dirty
self.metadata = metadata
self._raw = self.metadata["ac_ast"]
self.ancestor: str = ancestor
self.filepath: str = filepath
self.dirty: bool = dirty
self.metadata: Any = metadata
self._raw: Any = self.metadata["ac_ast"]
def save(self, msg): # pragma: no cover
pass
def save(self, msg: str) -> None:
pass # pragma: no cover
def find_ancestors(self, name): # pylint: disable=unused-variable
# pylint: disable=unused-variable
def find_ancestors(self, name: str) -> List["ApacheBlockNode"]:
"""Find ancestor BlockNodes with a given name"""
return [ApacheBlockNode(name=assertions.PASS,
parameters=assertions.PASS,
@@ -38,33 +42,33 @@ class ApacheParserNode(interfaces.ParserNode):
class ApacheCommentNode(ApacheParserNode):
""" apacheconfig implementation of CommentNode interface """
def __init__(self, **kwargs):
def __init__(self, **kwargs: Any):
comment, kwargs = util.commentnode_kwargs(kwargs) # pylint: disable=unused-variable
super().__init__(**kwargs)
self.comment = comment
def __eq__(self, other): # pragma: no cover
def __eq__(self, other: Any):
if isinstance(other, self.__class__):
return (self.comment == other.comment and
self.dirty == other.dirty and
self.ancestor == other.ancestor and
self.metadata == other.metadata and
self.filepath == other.filepath)
return False
return False # pragma: no cover
class ApacheDirectiveNode(ApacheParserNode):
""" apacheconfig implementation of DirectiveNode interface """
def __init__(self, **kwargs):
def __init__(self, **kwargs: Any):
name, parameters, enabled, kwargs = util.directivenode_kwargs(kwargs)
super().__init__(**kwargs)
self.name = name
self.parameters = parameters
self.enabled = enabled
self.include = None
self.name: str = name
self.parameters: List[str] = parameters
self.enabled: bool = enabled
self.include: Optional[str] = None
def __eq__(self, other): # pragma: no cover
def __eq__(self, other: Any) -> bool:
if isinstance(other, self.__class__):
return (self.name == other.name and
self.filepath == other.filepath and
@@ -73,21 +77,21 @@ class ApacheDirectiveNode(ApacheParserNode):
self.dirty == other.dirty and
self.ancestor == other.ancestor and
self.metadata == other.metadata)
return False
return False # pragma: no cover
def set_parameters(self, _parameters): # pragma: no cover
def set_parameters(self, _parameters):
"""Sets the parameters for DirectiveNode"""
return
return # pragma: no cover
class ApacheBlockNode(ApacheDirectiveNode):
""" apacheconfig implementation of BlockNode interface """
def __init__(self, **kwargs):
def __init__(self, **kwargs: Any):
super().__init__(**kwargs)
self.children: Tuple[ApacheParserNode, ...] = ()
def __eq__(self, other): # pragma: no cover
def __eq__(self, other):
if isinstance(other, self.__class__):
return (self.name == other.name and
self.filepath == other.filepath and
@@ -97,10 +101,12 @@ class ApacheBlockNode(ApacheDirectiveNode):
self.dirty == other.dirty and
self.ancestor == other.ancestor and
self.metadata == other.metadata)
return False
return False # pragma: no cover
# pylint: disable=unused-argument
def add_child_block(self, name, parameters=None, position=None): # pragma: no cover
def add_child_block(
self, name: str, parameters: Optional[str] = None, position: Optional[int] = None
) -> "ApacheBlockNode": # pragma: no cover
"""Adds a new BlockNode to the sequence of children"""
new_block = ApacheBlockNode(name=assertions.PASS,
parameters=assertions.PASS,
@@ -111,7 +117,9 @@ class ApacheBlockNode(ApacheDirectiveNode):
return new_block
# pylint: disable=unused-argument
def add_child_directive(self, name, parameters=None, position=None): # pragma: no cover
def add_child_directive(
self, name: str, parameters: Optional[str] = None, position: Optional[int] = None
) -> ApacheDirectiveNode: # pragma: no cover
"""Adds a new DirectiveNode to the sequence of children"""
new_dir = ApacheDirectiveNode(name=assertions.PASS,
parameters=assertions.PASS,
@@ -122,7 +130,9 @@ class ApacheBlockNode(ApacheDirectiveNode):
return new_dir
# pylint: disable=unused-argument
def add_child_comment(self, comment="", position=None): # pragma: no cover
def add_child_comment(
self, name: str, parameters: Optional[int] = None, position: Optional[int] = None
) -> ApacheCommentNode: # pragma: no cover
"""Adds a new CommentNode to the sequence of children"""
new_comment = ApacheCommentNode(comment=assertions.PASS,
@@ -132,7 +142,8 @@ class ApacheBlockNode(ApacheDirectiveNode):
self.children += (new_comment,)
return new_comment
def find_blocks(self, name, exclude=True): # pylint: disable=unused-argument
# pylint: disable=unused-argument
def find_blocks(self, name, exclude: bool = True) -> List["ApacheBlockNode"]:
"""Recursive search of BlockNodes from the sequence of children"""
return [ApacheBlockNode(name=assertions.PASS,
parameters=assertions.PASS,
@@ -140,7 +151,8 @@ class ApacheBlockNode(ApacheDirectiveNode):
filepath=assertions.PASS,
metadata=self.metadata)]
def find_directives(self, name, exclude=True): # pylint: disable=unused-argument
# pylint: disable=unused-argument
def find_directives(self, name: str, exclude: bool = True) -> List[ApacheDirectiveNode]:
"""Recursive search of DirectiveNodes from the sequence of children"""
return [ApacheDirectiveNode(name=assertions.PASS,
parameters=assertions.PASS,
@@ -149,22 +161,22 @@ class ApacheBlockNode(ApacheDirectiveNode):
metadata=self.metadata)]
# pylint: disable=unused-argument
def find_comments(self, comment, exact=False): # pragma: no cover
def find_comments(self, comment: str, exact: bool = False) -> List[ApacheCommentNode]:
"""Recursive search of DirectiveNodes from the sequence of children"""
return [ApacheCommentNode(comment=assertions.PASS,
return [ApacheCommentNode(comment=assertions.PASS, # pragma: no cover
ancestor=self,
filepath=assertions.PASS,
metadata=self.metadata)]
def delete_child(self, child): # pragma: no cover
def delete_child(self, child: "ApacheBlockNode") -> None:
"""Deletes a ParserNode from the sequence of children"""
return
return # pragma: no cover
def unsaved_files(self): # pragma: no cover
def unsaved_files(self) -> List[str]:
"""Returns a list of unsaved filepaths"""
return [assertions.PASS]
return [assertions.PASS] # pragma: no cover
def parsed_paths(self): # pragma: no cover
def parsed_paths(self) -> List[str]:
"""Returns a list of parsed configuration file paths"""
return [assertions.PASS]

View File

@@ -1,12 +1,13 @@
"""Dual parser node assertions"""
import fnmatch
from typing import Any
from certbot_apache._internal import interfaces
PASS = "CERTBOT_PASS_ASSERT"
def assertEqual(first, second):
def assertEqual(first: Any, second: Any) -> None:
""" Equality assertion """
if isinstance(first, interfaces.CommentNode):
@@ -30,7 +31,8 @@ def assertEqual(first, second):
assert first.filepath == second.filepath
def assertEqualComment(first, second): # pragma: no cover
# pragma: no cover
def assertEqualComment(first: Any, second: Any) -> None:
""" Equality assertion for CommentNode """
assert isinstance(first, interfaces.CommentNode)
@@ -40,7 +42,7 @@ def assertEqualComment(first, second): # pragma: no cover
assert first.comment == second.comment # type: ignore
def _assertEqualDirectiveComponents(first, second): # pragma: no cover
def _assertEqualDirectiveComponents(first: Any, second: Any) -> None: # pragma: no cover
""" Handles assertion for instance variables for DirectiveNode and BlockNode"""
# Enabled value cannot be asserted, because Augeas implementation
@@ -53,7 +55,7 @@ def _assertEqualDirectiveComponents(first, second): # pragma: no cover
assert first.parameters == second.parameters
def assertEqualDirective(first, second):
def assertEqualDirective(first: Any, second: Any) -> None:
""" Equality assertion for DirectiveNode """
assert isinstance(first, interfaces.DirectiveNode)
@@ -61,7 +63,7 @@ def assertEqualDirective(first, second):
_assertEqualDirectiveComponents(first, second)
def isPass(value): # pragma: no cover
def isPass(value) -> bool: # pragma: no cover
"""Checks if the value is set to PASS"""
if isinstance(value, bool):
return True
@@ -115,7 +117,7 @@ def assertEqualSimple(first, second):
assert first == second
def isEqualVirtualHost(first, second):
def isEqualVirtualHost(first, second) -> bool:
"""
Checks that two VirtualHost objects are similar. There are some built
in differences with the implementations: VirtualHost created by ParserNode

View File

@@ -64,7 +64,15 @@ Translates over to:
"/files/etc/apache2/apache2.conf/bLoCk[1]",
]
"""
from typing import Any
from typing import cast
from typing import Dict
from typing import Iterable
from typing import List
from typing import Optional
from typing import Set
from typing import Tuple
from typing import Union
from certbot import errors
from certbot.compat import os
@@ -78,14 +86,16 @@ from certbot_apache._internal import parsernode_util as util
class AugeasParserNode(interfaces.ParserNode):
""" Augeas implementation of ParserNode interface """
def __init__(self, **kwargs):
ancestor, dirty, filepath, metadata = util.parsernode_kwargs(kwargs) # pylint: disable=unused-variable
def __init__(self, **kwargs: Any):
# pylint: disable=unused-variable
ancestor, dirty, filepath, metadata = util.parsernode_kwargs(kwargs)
super().__init__(**kwargs)
self.ancestor = ancestor
self.filepath = filepath
self.dirty = dirty
self.metadata = metadata
self.parser = self.metadata.get("augeasparser")
self.ancestor: str = ancestor
self.filepath: str = filepath
self.dirty: bool = dirty
self.metadata: Dict[str, Any] = metadata
self.parser: parser.ApacheParser = cast(parser.ApacheParser,
self.metadata.get("augeasparser"))
try:
if self.metadata["augeaspath"].endswith("/"):
raise errors.PluginError(
@@ -96,10 +106,10 @@ class AugeasParserNode(interfaces.ParserNode):
except KeyError:
raise errors.PluginError("Augeas path is required")
def save(self, msg):
def save(self, msg: Iterable[str]) -> None:
self.parser.save(msg)
def find_ancestors(self, name):
def find_ancestors(self, name: str) -> List["AugeasBlockNode"]:
"""
Searches for ancestor BlockNodes with a given name.
@@ -109,7 +119,7 @@ class AugeasParserNode(interfaces.ParserNode):
:rtype: list of AugeasBlockNode
"""
ancestors = []
ancestors: List[AugeasBlockNode] = []
parent = self.metadata["augeaspath"]
while True:
@@ -124,7 +134,7 @@ class AugeasParserNode(interfaces.ParserNode):
return ancestors
def _create_blocknode(self, path):
def _create_blocknode(self, path: str) -> "AugeasBlockNode":
"""
Helper function to create a BlockNode from Augeas path. This is used by
AugeasParserNode.find_ancestors and AugeasBlockNode.
@@ -132,21 +142,25 @@ class AugeasParserNode(interfaces.ParserNode):
"""
name = self._aug_get_name(path)
metadata = {"augeasparser": self.parser, "augeaspath": path}
name: str = self._aug_get_name(path)
metadata: Dict[str, Union[parser.ApacheParser, str]] = {
"augeasparser": self.parser, "augeaspath": path
}
# Check if the file was included from the root config or initial state
enabled = self.parser.parsed_in_original(
apache_util.get_file_path(path)
)
file_path = apache_util.get_file_path(path)
if file_path is None:
raise ValueError(f"No file path found for vhost: {path}.") # pragma: no cover
enabled = self.parser.parsed_in_original(file_path)
return AugeasBlockNode(name=name,
enabled=enabled,
ancestor=assertions.PASS,
filepath=apache_util.get_file_path(path),
filepath=file_path,
metadata=metadata)
def _aug_get_name(self, path):
def _aug_get_name(self, path: str) -> str:
"""
Helper function to get name of a configuration block or variable from path.
"""
@@ -166,12 +180,12 @@ class AugeasParserNode(interfaces.ParserNode):
class AugeasCommentNode(AugeasParserNode):
""" Augeas implementation of CommentNode interface """
def __init__(self, **kwargs):
def __init__(self, **kwargs: Any):
comment, kwargs = util.commentnode_kwargs(kwargs) # pylint: disable=unused-variable
super().__init__(**kwargs)
self.comment = comment
def __eq__(self, other):
def __eq__(self, other: Any) -> bool:
if isinstance(other, self.__class__):
return (self.comment == other.comment and
self.filepath == other.filepath and
@@ -184,15 +198,15 @@ class AugeasCommentNode(AugeasParserNode):
class AugeasDirectiveNode(AugeasParserNode):
""" Augeas implementation of DirectiveNode interface """
def __init__(self, **kwargs):
def __init__(self, **kwargs: Any):
name, parameters, enabled, kwargs = util.directivenode_kwargs(kwargs)
super().__init__(**kwargs)
self.name = name
self.enabled = enabled
self.name: str = name
self.enabled: bool = enabled
if parameters:
self.set_parameters(parameters)
def __eq__(self, other):
def __eq__(self, other: Any) -> bool:
if isinstance(other, self.__class__):
return (self.name == other.name and
self.filepath == other.filepath and
@@ -203,7 +217,7 @@ class AugeasDirectiveNode(AugeasParserNode):
self.metadata == other.metadata)
return False
def set_parameters(self, parameters):
def set_parameters(self, parameters: List[str]):
"""
Sets parameters of a DirectiveNode or BlockNode object.
@@ -222,7 +236,7 @@ class AugeasDirectiveNode(AugeasParserNode):
self.parser.aug.set(param_path, param)
@property
def parameters(self):
def parameters(self) -> Tuple[Optional[str], ...]:
"""
Fetches the parameters from Augeas tree, ensuring that the sequence always
represents the current state
@@ -232,7 +246,7 @@ class AugeasDirectiveNode(AugeasParserNode):
"""
return tuple(self._aug_get_params(self.metadata["augeaspath"]))
def _aug_get_params(self, path):
def _aug_get_params(self, path: str) -> List[Optional[str]]:
"""Helper function to get parameters for DirectiveNodes and BlockNodes"""
arg_paths = self.parser.aug.match(path + "/arg")
@@ -242,11 +256,11 @@ class AugeasDirectiveNode(AugeasParserNode):
class AugeasBlockNode(AugeasDirectiveNode):
""" Augeas implementation of BlockNode interface """
def __init__(self, **kwargs):
def __init__(self, **kwargs: Any):
super().__init__(**kwargs)
self.children = ()
self.children: Tuple["AugeasBlockNode", ...] = ()
def __eq__(self, other):
def __eq__(self, other: Any) -> bool:
if isinstance(other, self.__class__):
return (self.name == other.name and
self.filepath == other.filepath and
@@ -259,21 +273,24 @@ class AugeasBlockNode(AugeasDirectiveNode):
return False
# pylint: disable=unused-argument
def add_child_block(self, name, parameters=None, position=None): # pragma: no cover
def add_child_block(
self, name: str, parameters: Optional[str] = None, position: Optional[int] = None
) -> "AugeasBlockNode": # pragma: no cover
"""Adds a new BlockNode to the sequence of children"""
insertpath, realpath, before = self._aug_resolve_child_position(
name,
position
)
new_metadata = {"augeasparser": self.parser, "augeaspath": realpath}
new_metadata: Dict[str, Any] = {"augeasparser": self.parser, "augeaspath": realpath}
# Create the new block
self.parser.aug.insert(insertpath, name, before)
# Check if the file was included from the root config or initial state
enabled = self.parser.parsed_in_original(
apache_util.get_file_path(realpath)
)
file_path = apache_util.get_file_path(realpath)
if file_path is None:
raise errors.Error(f"No file path found for vhost: {realpath}")
enabled = self.parser.parsed_in_original(file_path)
# Parameters will be set at the initialization of the new object
return AugeasBlockNode(
@@ -281,12 +298,14 @@ class AugeasBlockNode(AugeasDirectiveNode):
parameters=parameters,
enabled=enabled,
ancestor=assertions.PASS,
filepath=apache_util.get_file_path(realpath),
filepath=file_path,
metadata=new_metadata,
)
# pylint: disable=unused-argument
def add_child_directive(self, name, parameters=None, position=None): # pragma: no cover
def add_child_directive(
self, name: str, parameters=None, position=None
) -> "AugeasDirectiveNode": # pragma: no cover
"""Adds a new DirectiveNode to the sequence of children"""
if not parameters:
@@ -303,27 +322,32 @@ class AugeasBlockNode(AugeasDirectiveNode):
# Set the directive key
self.parser.aug.set(realpath, name)
# Check if the file was included from the root config or initial state
enabled = self.parser.parsed_in_original(
apache_util.get_file_path(realpath)
)
file_path = apache_util.get_file_path(realpath)
if file_path is None:
raise errors.Error(f"No file path found for vhost: {realpath}")
enabled = self.parser.parsed_in_original(file_path)
return AugeasDirectiveNode(
name=name,
parameters=parameters,
enabled=enabled,
ancestor=assertions.PASS,
filepath=apache_util.get_file_path(realpath),
filepath=file_path,
metadata=new_metadata,
)
def add_child_comment(self, comment="", position=None):
def add_child_comment(
self, comment: str = "", position: Optional[int] = None
) -> "AugeasCommentNode":
"""Adds a new CommentNode to the sequence of children"""
insertpath, realpath, before = self._aug_resolve_child_position(
"#comment",
position
)
new_metadata = {"augeasparser": self.parser, "augeaspath": realpath}
new_metadata: Dict[str, Any] = {
"augeasparser": self.parser, "augeaspath": realpath,
}
# Create the new comment
self.parser.aug.insert(insertpath, "#comment", before)
@@ -337,11 +361,11 @@ class AugeasBlockNode(AugeasDirectiveNode):
metadata=new_metadata,
)
def find_blocks(self, name, exclude=True):
def find_blocks(self, name: str, exclude: bool = True) -> List["AugeasBlockNode"]:
"""Recursive search of BlockNodes from the sequence of children"""
nodes = []
paths = self._aug_find_blocks(name)
nodes: List["AugeasBlockNode"] = []
paths: Iterable[str] = self._aug_find_blocks(name)
if exclude:
paths = self.parser.exclude_dirs(paths)
for path in paths:
@@ -349,7 +373,7 @@ class AugeasBlockNode(AugeasDirectiveNode):
return nodes
def find_directives(self, name, exclude=True):
def find_directives(self, name: str, exclude: bool = True) -> List["AugeasDirectiveNode"]:
"""Recursive search of DirectiveNodes from the sequence of children"""
nodes = []
@@ -368,14 +392,14 @@ class AugeasBlockNode(AugeasDirectiveNode):
return nodes
def find_comments(self, comment):
def find_comments(self, comment: str) -> List["AugeasCommentNode"]:
"""
Recursive search of DirectiveNodes from the sequence of children.
:param str comment: Comment content to search for.
"""
nodes = []
nodes: List["AugeasCommentNode"] = []
ownpath = self.metadata.get("augeaspath")
comments = self.parser.find_comments(comment, start=ownpath)
@@ -384,7 +408,7 @@ class AugeasBlockNode(AugeasDirectiveNode):
return nodes
def delete_child(self, child):
def delete_child(self, child: "AugeasParserNode") -> None:
"""
Deletes a ParserNode from the sequence of children, and raises an
exception if it's unable to do so.
@@ -397,11 +421,11 @@ class AugeasBlockNode(AugeasDirectiveNode):
"seem to exist.").format(child.metadata["augeaspath"])
)
def unsaved_files(self):
def unsaved_files(self) -> Set[str]:
"""Returns a list of unsaved filepaths"""
return self.parser.unsaved_files()
def parsed_paths(self):
def parsed_paths(self) -> List[str]:
"""
Returns a list of file paths that have currently been parsed into the parser
tree. The returned list may include paths with wildcard characters, for
@@ -412,7 +436,7 @@ class AugeasBlockNode(AugeasDirectiveNode):
:returns: list of file paths of files that have been parsed
"""
res_paths = []
res_paths: List[str] = []
paths = self.parser.existing_paths
for directory in paths:
@@ -421,7 +445,7 @@ class AugeasBlockNode(AugeasDirectiveNode):
return res_paths
def _create_commentnode(self, path):
def _create_commentnode(self, path: str) -> "AugeasCommentNode":
"""Helper function to create a CommentNode from Augeas path"""
comment = self.parser.aug.get(path)
@@ -434,14 +458,16 @@ class AugeasBlockNode(AugeasDirectiveNode):
filepath=apache_util.get_file_path(path),
metadata=metadata)
def _create_directivenode(self, path):
def _create_directivenode(self, path: str) -> "AugeasDirectiveNode":
"""Helper function to create a DirectiveNode from Augeas path"""
name = self.parser.get_arg(path)
metadata = {"augeasparser": self.parser, "augeaspath": path}
metadata: Dict[str, Union[parser.ApacheParser, str]] = {
"augeasparser": self.parser, "augeaspath": path,
}
# Check if the file was included from the root config or initial state
enabled = self.parser.parsed_in_original(
enabled: bool = self.parser.parsed_in_original(
apache_util.get_file_path(path)
)
return AugeasDirectiveNode(name=name,
@@ -450,12 +476,12 @@ class AugeasBlockNode(AugeasDirectiveNode):
filepath=apache_util.get_file_path(path),
metadata=metadata)
def _aug_find_blocks(self, name):
def _aug_find_blocks(self, name: str) -> Set[str]:
"""Helper function to perform a search to Augeas DOM tree to search
configuration blocks with a given name"""
# The code here is modified from configurator.get_virtual_hosts()
blk_paths = set()
blk_paths: Set[str] = set()
for vhost_path in list(self.parser.parser_paths):
paths = self.parser.aug.match(
("/files%s//*[label()=~regexp('%s')]" %
@@ -464,7 +490,8 @@ class AugeasBlockNode(AugeasDirectiveNode):
name.lower() in os.path.basename(path).lower()])
return blk_paths
def _aug_resolve_child_position(self, name, position):
def _aug_resolve_child_position(
self, name: str, position: Optional[int]) -> Tuple[str, str, bool]:
"""
Helper function that iterates through the immediate children and figures
out the insertion path for a new AugeasParserNode.
@@ -489,16 +516,16 @@ class AugeasBlockNode(AugeasDirectiveNode):
"""
# Default to appending
before = False
before: bool = False
all_children = self.parser.aug.match("{}/*".format(
all_children: str = self.parser.aug.match("{}/*".format(
self.metadata["augeaspath"])
)
# Calculate resulting_path
# Augeas indices start at 1. We use counter to calculate the index to
# be used in resulting_path.
counter = 1
counter: int = 1
for i, child in enumerate(all_children):
if position is not None and i >= position:
# We're not going to insert the new node to an index after this
@@ -507,7 +534,7 @@ class AugeasBlockNode(AugeasDirectiveNode):
if name == childname:
counter += 1
resulting_path = "{}/{}[{}]".format(
resulting_path: str = "{}/{}[{}]".format(
self.metadata["augeaspath"],
name,
counter

File diff suppressed because it is too large Load Diff

View File

@@ -1,4 +1,6 @@
"""Apache plugin constants."""
from typing import List, Dict
import pkg_resources
from certbot.compat import os
@@ -13,7 +15,7 @@ UPDATED_MOD_SSL_CONF_DIGEST = ".updated-options-ssl-apache-conf-digest.txt"
in `certbot.configuration.NamespaceConfig.config_dir`."""
# NEVER REMOVE A SINGLE HASH FROM THIS LIST UNLESS YOU KNOW EXACTLY WHAT YOU ARE DOING!
ALL_SSL_OPTIONS_HASHES = [
ALL_SSL_OPTIONS_HASHES: List[str] = [
'2086bca02db48daf93468332543c60ac6acdb6f0b58c7bfdf578a5d47092f82a',
'4844d36c9a0f587172d9fa10f4f1c9518e3bcfa1947379f155e16a70a728c21a',
'5a922826719981c0a234b1fbcd495f3213e49d2519e845ea0748ba513044b65b',
@@ -36,39 +38,39 @@ AUGEAS_LENS_DIR = pkg_resources.resource_filename(
"certbot_apache", os.path.join("_internal", "augeas_lens"))
"""Path to the Augeas lens directory"""
REWRITE_HTTPS_ARGS = [
REWRITE_HTTPS_ARGS: List[str] = [
"^", "https://%{SERVER_NAME}%{REQUEST_URI}", "[L,NE,R=permanent]"]
"""Apache version<2.3.9 rewrite rule arguments used for redirections to
https vhost"""
REWRITE_HTTPS_ARGS_WITH_END = [
REWRITE_HTTPS_ARGS_WITH_END: List[str] = [
"^", "https://%{SERVER_NAME}%{REQUEST_URI}", "[END,NE,R=permanent]"]
"""Apache version >= 2.3.9 rewrite rule arguments used for redirections to
https vhost"""
OLD_REWRITE_HTTPS_ARGS = [
OLD_REWRITE_HTTPS_ARGS: List[List[str]] = [
["^", "https://%{SERVER_NAME}%{REQUEST_URI}", "[L,QSA,R=permanent]"],
["^", "https://%{SERVER_NAME}%{REQUEST_URI}", "[END,QSA,R=permanent]"]]
HSTS_ARGS = ["always", "set", "Strict-Transport-Security",
HSTS_ARGS: List[str] = ["always", "set", "Strict-Transport-Security",
"\"max-age=31536000\""]
"""Apache header arguments for HSTS"""
UIR_ARGS = ["always", "set", "Content-Security-Policy",
"upgrade-insecure-requests"]
UIR_ARGS: List[str] = ["always", "set", "Content-Security-Policy", "upgrade-insecure-requests"]
HEADER_ARGS = {"Strict-Transport-Security": HSTS_ARGS,
"Upgrade-Insecure-Requests": UIR_ARGS}
HEADER_ARGS: Dict[str, List[str]] = {
"Strict-Transport-Security": HSTS_ARGS, "Upgrade-Insecure-Requests": UIR_ARGS,
}
AUTOHSTS_STEPS = [60, 300, 900, 3600, 21600, 43200, 86400]
AUTOHSTS_STEPS: List[int] = [60, 300, 900, 3600, 21600, 43200, 86400]
"""AutoHSTS increase steps: 1min, 5min, 15min, 1h, 6h, 12h, 24h"""
AUTOHSTS_PERMANENT = 31536000
AUTOHSTS_PERMANENT: int = 31536000
"""Value for the last max-age of HSTS"""
AUTOHSTS_FREQ = 172800
AUTOHSTS_FREQ: int = 172800
"""Minimum time since last increase to perform a new one: 48h"""
MANAGED_COMMENT = "DO NOT REMOVE - Managed by Certbot"
MANAGED_COMMENT_ID = MANAGED_COMMENT+", VirtualHost id: {0}"
MANAGED_COMMENT: str = "DO NOT REMOVE - Managed by Certbot"
MANAGED_COMMENT_ID: str = MANAGED_COMMENT + ", VirtualHost id: {0}"
"""Managed by Certbot comments and the VirtualHost identification template"""

View File

@@ -1,18 +1,23 @@
"""Contains UI methods for Apache operations."""
import logging
from typing import Iterable
from typing import List
from typing import Optional
from typing import Tuple
from certbot import errors
from certbot.compat import os
from certbot.display import util as display_util
from certbot_apache._internal import obj
logger = logging.getLogger(__name__)
def select_vhost_multiple(vhosts):
def select_vhost_multiple(vhosts: Optional[List[obj.VirtualHost]]) -> List[obj.VirtualHost]:
"""Select multiple Vhosts to install the certificate for
:param vhosts: Available Apache VirtualHosts
:type vhosts: :class:`list` of type `~obj.Vhost`
:type vhosts: :class:`list` of type `~obj.VirtualHost`
:returns: List of VirtualHosts
:rtype: :class:`list`of type `~obj.Vhost`
@@ -32,7 +37,7 @@ def select_vhost_multiple(vhosts):
return []
def _reversemap_vhosts(names, vhosts):
def _reversemap_vhosts(names: Iterable[str], vhosts: List[obj.VirtualHost]):
"""Helper function for select_vhost_multiple for mapping string
representations back to actual vhost objects"""
return_vhosts = []
@@ -44,9 +49,10 @@ def _reversemap_vhosts(names, vhosts):
return return_vhosts
def select_vhost(domain, vhosts):
def select_vhost(domain: str, vhosts: List[obj.VirtualHost]) -> Optional[obj.VirtualHost]:
"""Select an appropriate Apache Vhost.
:param domain: Domain to select
:param vhosts: Available Apache VirtualHosts
:type vhosts: :class:`list` of type `~obj.Vhost`
@@ -62,7 +68,7 @@ def select_vhost(domain, vhosts):
return None
def _vhost_menu(domain, vhosts):
def _vhost_menu(domain: str, vhosts: List[obj.VirtualHost]) -> Tuple[str, int]:
"""Select an appropriate Apache Vhost.
:param vhosts: Available Apache Virtual Hosts

View File

@@ -1,7 +1,16 @@
""" Dual ParserNode implementation """
from typing import Any
from typing import Callable
from typing import List
from typing import Optional
from typing import Sequence
from typing import Set
from typing import Tuple
from certbot_apache._internal import apacheparser
from certbot_apache._internal import assertions
from certbot_apache._internal import augeasparser
from certbot_apache._internal import interfaces
class DualNodeBase:
@@ -9,12 +18,12 @@ class DualNodeBase:
base class for dual parser interface classes. This class handles runtime
attribute value assertions."""
def save(self, msg): # pragma: no cover
def save(self, msg: str): # pragma: no cover
""" Call save for both parsers """
self.primary.save(msg)
self.secondary.save(msg)
def __getattr__(self, aname):
def __getattr__(self, aname: str) -> Any:
""" Attribute value assertion """
firstval = getattr(self.primary, aname)
secondval = getattr(self.secondary, aname)
@@ -28,11 +37,13 @@ class DualNodeBase:
assertions.assertEqualSimple(firstval, secondval)
return firstval
def find_ancestors(self, name):
def find_ancestors(self, name: str) -> Sequence[interfaces.ParserNode]:
""" Traverses the ancestor tree and returns ancestors matching name """
return self._find_helper(DualBlockNode, "find_ancestors", name)
def _find_helper(self, nodeclass, findfunc, search, **kwargs):
def _find_helper(
self, nodeclass: Callable, findfunc: str, search: str, **kwargs: Any
) -> List[apacheparser.ApacheBlockNode]:
"""A helper for find_* functions. The function specific attributes should
be passed as keyword arguments.
@@ -75,7 +86,7 @@ class DualNodeBase:
class DualCommentNode(DualNodeBase):
""" Dual parser implementation of CommentNode interface """
def __init__(self, **kwargs):
def __init__(self, **kwargs: Any):
""" This initialization implementation allows ordinary initialization
of CommentNode objects as well as creating a DualCommentNode object
using precreated or fetched CommentNode objects if provided as optional
@@ -107,7 +118,7 @@ class DualCommentNode(DualNodeBase):
class DualDirectiveNode(DualNodeBase):
""" Dual parser implementation of DirectiveNode interface """
def __init__(self, **kwargs):
def __init__(self, **kwargs: Any):
""" This initialization implementation allows ordinary initialization
of DirectiveNode objects as well as creating a DualDirectiveNode object
using precreated or fetched DirectiveNode objects if provided as optional
@@ -118,8 +129,6 @@ class DualDirectiveNode(DualNodeBase):
:param DirectiveNode primary: Primary pre-created DirectiveNode, mainly
used when creating new DualParser nodes using add_* methods.
:param DirectiveNode secondary: Secondary pre-created DirectiveNode
"""
kwargs.setdefault("primary", None)
@@ -132,8 +141,12 @@ class DualDirectiveNode(DualNodeBase):
self.primary = primary
self.secondary = secondary
else:
self.primary = augeasparser.AugeasDirectiveNode(**kwargs)
self.secondary = apacheparser.ApacheDirectiveNode(**kwargs)
self.primary = augeasparser.AugeasDirectiveNode(
**kwargs
)
self.secondary = apacheparser.ApacheDirectiveNode(
**kwargs
)
assertions.assertEqual(self.primary, self.secondary)
@@ -149,7 +162,7 @@ class DualDirectiveNode(DualNodeBase):
class DualBlockNode(DualNodeBase):
""" Dual parser implementation of BlockNode interface """
def __init__(self, **kwargs):
def __init__(self, **kwargs: Any):
""" This initialization implementation allows ordinary initialization
of BlockNode objects as well as creating a DualBlockNode object
using precreated or fetched BlockNode objects if provided as optional
@@ -164,8 +177,8 @@ class DualBlockNode(DualNodeBase):
kwargs.setdefault("primary", None)
kwargs.setdefault("secondary", None)
primary = kwargs.pop("primary")
secondary = kwargs.pop("secondary")
primary: Optional[augeasparser.AugeasBlockNode] = kwargs.pop("primary")
secondary: Optional[apacheparser.ApacheBlockNode] = kwargs.pop("secondary")
if primary or secondary:
assert primary and secondary
@@ -177,7 +190,9 @@ class DualBlockNode(DualNodeBase):
assertions.assertEqual(self.primary, self.secondary)
def add_child_block(self, name, parameters=None, position=None):
def add_child_block(
self, name: str, parameters: Optional[str] = None, position: Optional[int] = None
) -> "DualBlockNode":
""" Creates a new child BlockNode, asserts that both implementations
did it in a similar way, and returns a newly created DualBlockNode object
encapsulating both of the newly created objects """
@@ -187,7 +202,9 @@ class DualBlockNode(DualNodeBase):
assertions.assertEqual(primary_new, secondary_new)
return DualBlockNode(primary=primary_new, secondary=secondary_new)
def add_child_directive(self, name, parameters=None, position=None):
def add_child_directive(
self, name: str, parameters: Optional[str] = None, position: Optional[int] = None
) -> DualDirectiveNode:
""" Creates a new child DirectiveNode, asserts that both implementations
did it in a similar way, and returns a newly created DualDirectiveNode
object encapsulating both of the newly created objects """
@@ -197,17 +214,23 @@ class DualBlockNode(DualNodeBase):
assertions.assertEqual(primary_new, secondary_new)
return DualDirectiveNode(primary=primary_new, secondary=secondary_new)
def add_child_comment(self, comment="", position=None):
def add_child_comment(
self, comment: str = "", position: Optional[int] = None
) -> DualCommentNode:
""" Creates a new child CommentNode, asserts that both implementations
did it in a similar way, and returns a newly created DualCommentNode
object encapsulating both of the newly created objects """
primary_new = self.primary.add_child_comment(comment, position)
secondary_new = self.secondary.add_child_comment(comment, position)
primary_new = self.primary.add_child_comment(comment=comment, position=position)
secondary_new = self.secondary.add_child_comment(name=comment, position=position)
assertions.assertEqual(primary_new, secondary_new)
return DualCommentNode(primary=primary_new, secondary=secondary_new)
def _create_matching_list(self, primary_list, secondary_list):
def _create_matching_list(
self,
primary_list: List[interfaces.ParserNode],
secondary_list: List[interfaces.ParserNode],
) -> List[Tuple[interfaces.ParserNode, interfaces.ParserNode]]:
""" Matches the list of primary_list to a list of secondary_list and
returns a list of tuples. This is used to create results for find_
methods.
@@ -234,7 +257,7 @@ class DualBlockNode(DualNodeBase):
raise AssertionError("Could not find a matching node.")
return matched
def find_blocks(self, name, exclude=True):
def find_blocks(self, name: str, exclude: bool = True) -> List[apacheparser.ApacheBlockNode]:
"""
Performs a search for BlockNodes using both implementations and does simple
checks for results. This is built upon the assumption that unimplemented
@@ -246,7 +269,8 @@ class DualBlockNode(DualNodeBase):
return self._find_helper(DualBlockNode, "find_blocks", name,
exclude=exclude)
def find_directives(self, name, exclude=True):
def find_directives(self, name: str, exclude: bool = True
) -> Sequence[apacheparser.ApacheDirectiveNode]:
"""
Performs a search for DirectiveNodes using both implementations and
checks the results. This is built upon the assumption that unimplemented
@@ -258,7 +282,7 @@ class DualBlockNode(DualNodeBase):
return self._find_helper(DualDirectiveNode, "find_directives", name,
exclude=exclude)
def find_comments(self, comment):
def find_comments(self, comment: str) -> Sequence[apacheparser.ApacheParserNode]:
"""
Performs a search for CommentNodes using both implementations and
checks the results. This is built upon the assumption that unimplemented
@@ -269,7 +293,7 @@ class DualBlockNode(DualNodeBase):
return self._find_helper(DualCommentNode, "find_comments", comment)
def delete_child(self, child):
def delete_child(self, child: "DualBlockNode"):
"""Deletes a child from the ParserNode implementations. The actual
ParserNode implementations are used here directly in order to be able
to match a child to the list of children."""
@@ -277,7 +301,7 @@ class DualBlockNode(DualNodeBase):
self.primary.delete_child(child.primary)
self.secondary.delete_child(child.secondary)
def unsaved_files(self):
def unsaved_files(self) -> Set[str]:
""" Fetches the list of unsaved file paths and asserts that the lists
match """
primary_files = self.primary.unsaved_files()
@@ -286,7 +310,7 @@ class DualBlockNode(DualNodeBase):
return primary_files
def parsed_paths(self):
def parsed_paths(self) -> List[str]:
"""
Returns a list of file paths that have currently been parsed into the parser
tree. The returned list may include paths with wildcard characters, for

View File

@@ -1,4 +1,7 @@
""" Entry point for Apache Plugin """
from typing import Callable
from typing import Dict
from certbot import util
from certbot_apache._internal import configurator
from certbot_apache._internal import override_arch
@@ -10,7 +13,7 @@ from certbot_apache._internal import override_gentoo
from certbot_apache._internal import override_suse
from certbot_apache._internal import override_void
OVERRIDE_CLASSES = {
OVERRIDE_CLASSES: Dict[str, Callable] = {
"arch": override_arch.ArchConfigurator,
"cloudlinux": override_centos.CentOSConfigurator,
"darwin": override_darwin.DarwinConfigurator,

View File

@@ -1,11 +1,14 @@
"""A class that performs HTTP-01 challenges for Apache"""
import errno
import logging
from typing import Any
from typing import List
from typing import Set
from typing import TYPE_CHECKING
from acme.challenges import HTTP01Response
from certbot import errors
from certbot.achallenges import KeyAuthorizationAnnotatedChallenge
from certbot.compat import filesystem
from certbot.compat import os
from certbot.plugins import common
@@ -64,7 +67,7 @@ class ApacheHttp01(common.ChallengePerformer):
"http_challenges")
self.moded_vhosts: Set[VirtualHost] = set()
def perform(self):
def perform(self) -> List[KeyAuthorizationAnnotatedChallenge]:
"""Perform all HTTP-01 challenges."""
if not self.achalls:
return []
@@ -72,8 +75,7 @@ class ApacheHttp01(common.ChallengePerformer):
# About to make temporary changes to the config
self.configurator.save("Changes before challenge setup", True)
self.configurator.ensure_listen(str(
self.configurator.config.http01_port))
self.configurator.ensure_listen(str(self.configurator.config.http01_port))
self.prepare_http01_modules()
responses = self._set_up_challenges()
@@ -84,7 +86,7 @@ class ApacheHttp01(common.ChallengePerformer):
return responses
def prepare_http01_modules(self):
def prepare_http01_modules(self) -> None:
"""Make sure that we have the needed modules available for http01"""
if self.configurator.conf("handle-modules"):
@@ -97,7 +99,7 @@ class ApacheHttp01(common.ChallengePerformer):
if mod + "_module" not in self.configurator.parser.modules:
self.configurator.enable_mod(mod, temp=True)
def _mod_config(self):
def _mod_config(self) -> None:
selected_vhosts: List[VirtualHost] = []
http_port = str(self.configurator.config.http01_port)
@@ -146,7 +148,7 @@ class ApacheHttp01(common.ChallengePerformer):
with open(self.challenge_conf_post, "w") as new_conf:
new_conf.write(config_text_post)
def _matching_vhosts(self, domain):
def _matching_vhosts(self, domain: str) -> List[VirtualHost]:
"""Return all VirtualHost objects that have the requested domain name or
a wildcard name that would match the domain in ServerName or ServerAlias
directive.
@@ -160,9 +162,9 @@ class ApacheHttp01(common.ChallengePerformer):
return matching_vhosts
def _relevant_vhosts(self):
def _relevant_vhosts(self) -> List[VirtualHost]:
http01_port = str(self.configurator.config.http01_port)
relevant_vhosts = []
relevant_vhosts: List[VirtualHost] = []
for vhost in self.configurator.vhosts:
if any(a.is_wildcard() or a.get_port() == http01_port for a in vhost.addrs):
if not vhost.ssl:
@@ -180,7 +182,7 @@ class ApacheHttp01(common.ChallengePerformer):
"""Return all VirtualHost objects with no ServerName"""
return [vh for vh in self.configurator.vhosts if vh.name is None]
def _set_up_challenges(self):
def _set_up_challenges(self) -> List[HTTP01Response]:
if not os.path.isdir(self.challenge_dir):
old_umask = filesystem.umask(0o022)
try:
@@ -198,10 +200,12 @@ class ApacheHttp01(common.ChallengePerformer):
return responses
def _set_up_challenge(self, achall):
def _set_up_challenge(self, achall: KeyAuthorizationAnnotatedChallenge) -> HTTP01Response:
response: HTTP01Response
validation: Any
response, validation = achall.response_and_validation()
name = os.path.join(self.challenge_dir, achall.chall.encode("token"))
name: str = os.path.join(self.challenge_dir, achall.chall.encode("token"))
self.configurator.reverter.register_file_creation(True, name)
with open(name, 'wb') as f:
@@ -210,7 +214,7 @@ class ApacheHttp01(common.ChallengePerformer):
return response
def _set_up_include_directives(self, vhost):
def _set_up_include_directives(self, vhost: VirtualHost) -> None:
"""Includes override configuration to the beginning and to the end of
VirtualHost. Note that this include isn't added to Augeas search tree"""

View File

@@ -100,6 +100,9 @@ For this reason the internal representation of data should not ignore the case.
"""
import abc
from typing import Any
from typing import List
from typing import Optional
class ParserNode(metaclass=abc.ABCMeta):
@@ -146,7 +149,7 @@ class ParserNode(metaclass=abc.ABCMeta):
"""
@abc.abstractmethod
def __init__(self, **kwargs):
def __init__(self, **kwargs: Any):
"""
Initializes the ParserNode instance, and sets the ParserNode specific
instance variables. This is not meant to be used directly, but through
@@ -170,7 +173,7 @@ class ParserNode(metaclass=abc.ABCMeta):
"""
@abc.abstractmethod
def save(self, msg):
def save(self, msg: str) -> None:
"""
Save traverses the children, and attempts to write the AST to disk for
all the objects that are marked dirty. The actual operation of course
@@ -189,7 +192,7 @@ class ParserNode(metaclass=abc.ABCMeta):
"""
@abc.abstractmethod
def find_ancestors(self, name):
def find_ancestors(self, name: str):
"""
Traverses the ancestor tree up, searching for BlockNodes with a specific
name.
@@ -220,7 +223,7 @@ class CommentNode(ParserNode, metaclass=abc.ABCMeta):
"""
@abc.abstractmethod
def __init__(self, **kwargs):
def __init__(self, **kwargs: Any):
"""
Initializes the CommentNode instance and sets its instance variables.
@@ -238,10 +241,12 @@ class CommentNode(ParserNode, metaclass=abc.ABCMeta):
created or changed after the last save. Default: False.
:type dirty: bool
"""
super().__init__(ancestor=kwargs['ancestor'],
super().__init__( # pragma: no cover
ancestor=kwargs['ancestor'],
dirty=kwargs.get('dirty', False),
filepath=kwargs['filepath'],
metadata=kwargs.get('metadata', {})) # pragma: no cover
metadata=kwargs.get('metadata', {}),
)
class DirectiveNode(ParserNode, metaclass=abc.ABCMeta):
@@ -272,7 +277,7 @@ class DirectiveNode(ParserNode, metaclass=abc.ABCMeta):
"""
@abc.abstractmethod
def __init__(self, **kwargs):
def __init__(self, **kwargs: Any) -> None:
"""
Initializes the DirectiveNode instance and sets its instance variables.
@@ -302,13 +307,15 @@ class DirectiveNode(ParserNode, metaclass=abc.ABCMeta):
:type enabled: bool
"""
super().__init__(ancestor=kwargs['ancestor'],
super().__init__( # pragma: no cover
ancestor=kwargs['ancestor'],
dirty=kwargs.get('dirty', False),
filepath=kwargs['filepath'],
metadata=kwargs.get('metadata', {})) # pragma: no cover
metadata=kwargs.get('metadata', {}),
)
@abc.abstractmethod
def set_parameters(self, parameters):
def set_parameters(self, parameters: List[str]) -> None:
"""
Sets the sequence of parameters for this ParserNode object without
whitespaces. While the whitespaces for parameters are discarded when using
@@ -361,7 +368,9 @@ class BlockNode(DirectiveNode, metaclass=abc.ABCMeta):
"""
@abc.abstractmethod
def add_child_block(self, name, parameters=None, position=None):
def add_child_block(
self, name: str, parameters: List[str] = None, position: int = None
) -> "BlockNode":
"""
Adds a new BlockNode child node with provided values and marks the callee
BlockNode dirty. This is used to add new children to the AST. The preceding
@@ -381,7 +390,9 @@ class BlockNode(DirectiveNode, metaclass=abc.ABCMeta):
"""
@abc.abstractmethod
def add_child_directive(self, name, parameters=None, position=None):
def add_child_directive(
self, name: str, parameters: Optional[List[str]] = None, position: Optional[int] = None
) -> "DirectiveNode":
"""
Adds a new DirectiveNode child node with provided values and marks the
callee BlockNode dirty. This is used to add new children to the AST. The
@@ -402,7 +413,7 @@ class BlockNode(DirectiveNode, metaclass=abc.ABCMeta):
"""
@abc.abstractmethod
def add_child_comment(self, comment="", position=None):
def add_child_comment(self, comment: str = "", position: Optional[int] = None) -> "CommentNode":
"""
Adds a new CommentNode child node with provided value and marks the
callee BlockNode dirty. This is used to add new children to the AST. The
@@ -422,7 +433,7 @@ class BlockNode(DirectiveNode, metaclass=abc.ABCMeta):
"""
@abc.abstractmethod
def find_blocks(self, name, exclude=True):
def find_blocks(self, name: str, exclude: bool = True) -> List["BlockNode"]:
"""
Find a configuration block by name. This method walks the child tree of
ParserNodes under the instance it was called from. This way it is possible
@@ -439,7 +450,23 @@ class BlockNode(DirectiveNode, metaclass=abc.ABCMeta):
"""
@abc.abstractmethod
def find_directives(self, name, exclude=True):
def find_comments(self, comment: str) -> List["CommentNode"]:
"""
Find comments with value containing the search term.
This method walks the child tree of ParserNodes under the instance it was
called from. This way it is possible to search for the whole configuration
tree, when starting from root node, or to do a partial search when starting
from a specified branch. The lookup should be case sensitive.
:param str comment: The content of comment to search for
:returns: A list of found CommentNode objects.
"""
@abc.abstractmethod
def find_directives(self, name: str, exclude: bool = True):
"""
Find a directive by name. This method walks the child tree of ParserNodes
under the instance it was called from. This way it is possible to search
@@ -457,23 +484,7 @@ class BlockNode(DirectiveNode, metaclass=abc.ABCMeta):
"""
@abc.abstractmethod
def find_comments(self, comment):
"""
Find comments with value containing the search term.
This method walks the child tree of ParserNodes under the instance it was
called from. This way it is possible to search for the whole configuration
tree, when starting from root node, or to do a partial search when starting
from a specified branch. The lookup should be case sensitive.
:param str comment: The content of comment to search for
:returns: A list of found CommentNode objects.
"""
@abc.abstractmethod
def delete_child(self, child):
def delete_child(self, child: "ParserNode") -> None:
"""
Remove a specified child node from the list of children of the called
BlockNode object.
@@ -483,7 +494,7 @@ class BlockNode(DirectiveNode, metaclass=abc.ABCMeta):
"""
@abc.abstractmethod
def unsaved_files(self):
def unsaved_files(self) -> List[str]:
"""
Returns a list of file paths that have been changed since the last save
(or the initial configuration parse). The intended use for this method
@@ -496,7 +507,7 @@ class BlockNode(DirectiveNode, metaclass=abc.ABCMeta):
"""
@abc.abstractmethod
def parsed_paths(self):
def parsed_paths(self) -> List[str]:
"""
Returns a list of file paths that have currently been parsed into the parser
tree. The returned list may include paths with wildcard characters, for

View File

@@ -1,14 +1,19 @@
"""Module contains classes used by the Apache Configurator."""
import re
from typing import Any
from typing import Iterable
from typing import Optional
from typing import Pattern
from typing import Set
from certbot.plugins import common
from certbot_apache._internal import interfaces
class Addr(common.Addr):
"""Represents an Apache address."""
def __eq__(self, other):
def __eq__(self, other: Any):
"""This is defined as equivalent within Apache.
ip_addr:* == ip_addr
@@ -28,12 +33,12 @@ class Addr(common.Addr):
# __cmp__ is overridden. See https://bugs.python.org/issue2235
return super().__hash__()
def _addr_less_specific(self, addr):
def _addr_less_specific(self, addr: "Addr") -> bool:
"""Returns if addr.get_addr() is more specific than self.get_addr()."""
# pylint: disable=protected-access
return addr._rank_specific_addr() > self._rank_specific_addr()
def _rank_specific_addr(self):
def _rank_specific_addr(self) -> int:
"""Returns numerical rank for get_addr()
:returns: 2 - FQ, 1 - wildcard, 0 - _default_
@@ -46,7 +51,7 @@ class Addr(common.Addr):
return 1
return 2
def conflicts(self, addr):
def conflicts(self, addr: "Addr") -> bool:
r"""Returns if address could conflict with correct function of self.
Could addr take away service provided by self within Apache?
@@ -74,11 +79,11 @@ class Addr(common.Addr):
return True
return False
def is_wildcard(self):
def is_wildcard(self) -> bool:
"""Returns if address has a wildcard port."""
return self.tup[1] == "*" or not self.tup[1]
def get_sni_addr(self, port):
def get_sni_addr(self, port: str) -> common.Addr:
"""Returns the least specific address that resolves on the port.
Examples:
@@ -118,13 +123,16 @@ class VirtualHost:
"""
# ?: is used for not returning enclosed characters
strip_name = re.compile(r"^(?:.+://)?([^ :$]*)")
strip_name: Pattern = re.compile(r"^(?:.+://)?([^ :$]*)")
def __init__(self, filep, path, addrs, ssl, enabled, name=None,
aliases=None, modmacro=False, ancestor=None, node=None):
def __init__(
self, filepath: str, path: str, addrs: Set["Addr"],
ssl: bool, enabled: bool, name: Optional[str] = None,
aliases: Optional[Set[str]] = None, modmacro: bool = False,
ancestor: Optional["VirtualHost"] = None, node = None):
"""Initialize a VH."""
self.filep = filep
self.filep = filepath
self.path = path
self.addrs = addrs
self.name = name
@@ -133,9 +141,9 @@ class VirtualHost:
self.enabled = enabled
self.modmacro = modmacro
self.ancestor = ancestor
self.node = node
self.node: interfaces.BlockNode = node
def get_names(self):
def get_names(self) -> Set[str]:
"""Return a set of all names."""
all_names: Set[str] = set()
all_names.update(self.aliases)
@@ -157,7 +165,7 @@ class VirtualHost:
f"mod_macro Vhost: {'Yes' if self.modmacro else 'No'}"
)
def display_repr(self):
def display_repr(self) -> str:
"""Return a representation of VHost to be used in dialog"""
return (
f"File: {self.filep}\n"
@@ -166,7 +174,7 @@ class VirtualHost:
f"HTTPS: {'Yes' if self.ssl else 'No'}\n"
)
def __eq__(self, other):
def __eq__(self, other: Any) -> bool:
if isinstance(other, self.__class__):
return (self.filep == other.filep and self.path == other.path and
self.addrs == other.addrs and
@@ -182,7 +190,7 @@ class VirtualHost:
tuple(self.addrs), tuple(self.get_names()),
self.ssl, self.enabled, self.modmacro))
def conflicts(self, addrs):
def conflicts(self, addrs: Iterable[Addr]) -> bool:
"""See if vhost conflicts with any of the addrs.
This determines whether or not these addresses would/could overwrite
@@ -201,7 +209,7 @@ class VirtualHost:
return True
return False
def same_server(self, vhost, generic=False):
def same_server(self, vhost: "VirtualHost", generic: bool = False) -> bool:
"""Determines if the vhost is the same 'server'.
Used in redirection - indicates whether or not the two virtual hosts

View File

@@ -1,5 +1,6 @@
""" Distribution specific override class for CentOS family (RHEL, Fedora) """
import logging
from typing import Any
from typing import cast
from typing import List
@@ -30,7 +31,7 @@ class CentOSConfigurator(configurator.ApacheConfigurator):
challenge_location="/etc/httpd/conf.d",
)
def config_test(self):
def config_test(self) -> None:
"""
Override config_test to mitigate configtest error in vanilla installation
of mod_ssl in Fedora. The error is caused by non-existent self-signed
@@ -49,7 +50,7 @@ class CentOSConfigurator(configurator.ApacheConfigurator):
else:
raise
def _try_restart_fedora(self):
def _try_restart_fedora(self) -> None:
"""
Tries to restart httpd using systemctl to generate the self signed key pair.
"""
@@ -62,7 +63,7 @@ class CentOSConfigurator(configurator.ApacheConfigurator):
# Finish with actual config check to see if systemctl restart helped
super().config_test()
def _prepare_options(self):
def _prepare_options(self) -> None:
"""
Override the options dictionary initialization in order to support
alternative restart cmd used in CentOS.
@@ -72,13 +73,12 @@ class CentOSConfigurator(configurator.ApacheConfigurator):
raise ValueError("OS option restart_cmd_alt must be set for CentOS.")
self.options.restart_cmd_alt[0] = self.options.ctl
def get_parser(self):
def get_parser(self) -> "CentOSParser":
"""Initializes the ApacheParser"""
return CentOSParser(
self.options.server_root, self.options.vhost_root,
self.version, configurator=self)
self.options.server_root, self, self.options.vhost_root, self.version)
def _deploy_cert(self, *args, **kwargs): # pylint: disable=arguments-differ
def _deploy_cert(self, *args: Any, **kwargs: Any): # pylint: disable=arguments-differ
"""
Override _deploy_cert in order to ensure that the Apache configuration
has "LoadModule ssl_module..." before parsing the VirtualHost configuration
@@ -88,7 +88,7 @@ class CentOSConfigurator(configurator.ApacheConfigurator):
if self.version < (2, 4, 0):
self._deploy_loadmodule_ssl_if_needed()
def _deploy_loadmodule_ssl_if_needed(self):
def _deploy_loadmodule_ssl_if_needed(self) -> None:
"""
Add "LoadModule ssl_module <pre-existing path>" to main httpd.conf if
it doesn't exist there already.
@@ -110,14 +110,13 @@ class CentOSConfigurator(configurator.ApacheConfigurator):
"use, and run Certbot again.")
raise MisconfigurationError(msg)
else:
loadmod_args = path_args
loadmod_args = [arg for arg in path_args if arg]
centos_parser: CentOSParser = cast(CentOSParser, self.parser)
if centos_parser.not_modssl_ifmodule(noarg_path):
if centos_parser.loc["default"] in noarg_path:
# LoadModule already in the main configuration file
if ("ifmodule/" in noarg_path.lower() or
"ifmodule[1]" in noarg_path.lower()):
if "ifmodule/" in noarg_path.lower() or "ifmodule[1]" in noarg_path.lower():
# It's the first or only IfModule in the file
return
# Populate the list of known !mod_ssl.c IfModules
@@ -148,8 +147,7 @@ class CentOSConfigurator(configurator.ApacheConfigurator):
self.parser.aug.remove(loadmod_path)
# Create a new IfModule !mod_ssl.c if not already found on path
ssl_ifmod = self.parser.get_ifmod(nodir_path, "!mod_ssl.c",
beginning=True)[:-1]
ssl_ifmod = self.parser.get_ifmod(nodir_path, "!mod_ssl.c", beginning=True)[:-1]
if ssl_ifmod not in correct_ifmods:
self.parser.add_dir(ssl_ifmod, "LoadModule", loadmod_args)
correct_ifmods.append(ssl_ifmod)
@@ -159,24 +157,24 @@ class CentOSConfigurator(configurator.ApacheConfigurator):
class CentOSParser(parser.ApacheParser):
"""CentOS specific ApacheParser override class"""
def __init__(self, *args, **kwargs):
def __init__(self, *args: Any, **kwargs: Any) -> None:
# CentOS specific configuration file for Apache
self.sysconfig_filep = "/etc/sysconfig/httpd"
self.sysconfig_filep: str = "/etc/sysconfig/httpd"
super().__init__(*args, **kwargs)
def update_runtime_variables(self):
def update_runtime_variables(self) -> None:
""" Override for update_runtime_variables for custom parsing """
# Opportunistic, works if SELinux not enforced
super().update_runtime_variables()
self.parse_sysconfig_var()
def parse_sysconfig_var(self):
def parse_sysconfig_var(self) -> None:
""" Parses Apache CLI options from CentOS configuration file """
defines = apache_util.parse_define_file(self.sysconfig_filep, "OPTIONS")
for k, v in defines.items():
self.variables[k] = v
def not_modssl_ifmodule(self, path):
def not_modssl_ifmodule(self, path: str) -> bool:
"""Checks if the provided Augeas path has argument !mod_ssl"""
if "ifmodule" not in path.lower():

View File

@@ -8,6 +8,7 @@ from certbot.compat import os
from certbot_apache._internal import apache_util
from certbot_apache._internal import configurator
from certbot_apache._internal.configurator import OsOptions
from certbot_apache._internal.obj import VirtualHost
logger = logging.getLogger(__name__)
@@ -22,7 +23,7 @@ class DebianConfigurator(configurator.ApacheConfigurator):
handle_sites=True,
)
def enable_site(self, vhost):
def enable_site(self, vhost: VirtualHost) -> None:
"""Enables an available site, Apache reload required.
.. note:: Does not make sure that the site correctly works or that all
@@ -67,7 +68,7 @@ class DebianConfigurator(configurator.ApacheConfigurator):
self.save_notes += "Enabled site %s\n" % vhost.filep
return None
def enable_mod(self, mod_name, temp=False):
def enable_mod(self, mod_name: str, temp: bool = False) -> None:
"""Enables module in Apache.
Both enables and reloads Apache so module is active.
@@ -113,7 +114,7 @@ class DebianConfigurator(configurator.ApacheConfigurator):
# Reload is not necessary as DUMP_RUN_CFG uses latest config.
self.parser.update_runtime_variables()
def _enable_mod_debian(self, mod_name, temp):
def _enable_mod_debian(self, mod_name: str, temp: bool) -> None:
"""Assumes mods-available, mods-enabled layout."""
# Generate reversal command.
# Try to be safe here... check that we can probably reverse before
@@ -124,6 +125,5 @@ class DebianConfigurator(configurator.ApacheConfigurator):
"Unable to find a2dismod, please make sure a2enmod and "
"a2dismod are configured correctly for certbot.")
self.reverter.register_undo_command(
temp, [self.options.dismod, "-f", mod_name])
self.reverter.register_undo_command(temp, [self.options.dismod, "-f", mod_name])
util.run_script([self.options.enmod, mod_name])

View File

@@ -23,7 +23,7 @@ class FedoraConfigurator(configurator.ApacheConfigurator):
challenge_location="/etc/httpd/conf.d",
)
def config_test(self):
def config_test(self) -> None:
"""
Override config_test to mitigate configtest error in vanilla installation
of mod_ssl in Fedora. The error is caused by non-existent self-signed
@@ -35,13 +35,12 @@ class FedoraConfigurator(configurator.ApacheConfigurator):
except errors.MisconfigurationError:
self._try_restart_fedora()
def get_parser(self):
def get_parser(self) -> "FedoraParser":
"""Initializes the ApacheParser"""
return FedoraParser(
self.options.server_root, self.options.vhost_root,
self.version, configurator=self)
self.options.server_root, self, self.options.vhost_root, self.version)
def _try_restart_fedora(self):
def _try_restart_fedora(self) -> None:
"""
Tries to restart httpd using systemctl to generate the self signed key pair.
"""
@@ -53,7 +52,7 @@ class FedoraConfigurator(configurator.ApacheConfigurator):
# Finish with actual config check to see if systemctl restart helped
super().config_test()
def _prepare_options(self):
def _prepare_options(self) -> None:
"""
Override the options dictionary initialization to keep using apachectl
instead of httpd and so take advantages of this new bash script in newer versions
@@ -69,18 +68,18 @@ class FedoraConfigurator(configurator.ApacheConfigurator):
class FedoraParser(parser.ApacheParser):
"""Fedora 29+ specific ApacheParser override class"""
def __init__(self, *args, **kwargs):
def __init__(self, *args, **kwargs) -> None:
# Fedora 29+ specific configuration file for Apache
self.sysconfig_filep = "/etc/sysconfig/httpd"
super().__init__(*args, **kwargs)
def update_runtime_variables(self):
def update_runtime_variables(self) -> None:
""" Override for update_runtime_variables for custom parsing """
# Opportunistic, works if SELinux not enforced
super().update_runtime_variables()
self._parse_sysconfig_var()
def _parse_sysconfig_var(self):
def _parse_sysconfig_var(self) -> None:
""" Parses Apache CLI options from Fedora configuration file """
defines = apache_util.parse_define_file(self.sysconfig_filep, "OPTIONS")
for k, v in defines.items():

View File

@@ -1,4 +1,6 @@
""" Distribution specific override class for Gentoo Linux """
from typing import Any
from certbot_apache._internal import apache_util
from certbot_apache._internal import configurator
from certbot_apache._internal import parser
@@ -16,7 +18,7 @@ class GentooConfigurator(configurator.ApacheConfigurator):
challenge_location="/etc/apache2/vhosts.d",
)
def _prepare_options(self):
def _prepare_options(self) -> None:
"""
Override the options dictionary initialization in order to support
alternative restart cmd used in Gentoo.
@@ -26,33 +28,32 @@ class GentooConfigurator(configurator.ApacheConfigurator):
raise ValueError("OS option restart_cmd_alt must be set for Gentoo.")
self.options.restart_cmd_alt[0] = self.options.ctl
def get_parser(self):
def get_parser(self) -> "GentooParser":
"""Initializes the ApacheParser"""
return GentooParser(
self.options.server_root, self.options.vhost_root,
self.version, configurator=self)
self.options.server_root, self, self.options.vhost_root, self.version)
class GentooParser(parser.ApacheParser):
"""Gentoo specific ApacheParser override class"""
def __init__(self, *args, **kwargs):
def __init__(self, *args: Any, **kwargs: Any) -> None:
# Gentoo specific configuration file for Apache2
self.apacheconfig_filep = "/etc/conf.d/apache2"
super().__init__(*args, **kwargs)
def update_runtime_variables(self):
def update_runtime_variables(self) -> None:
""" Override for update_runtime_variables for custom parsing """
self.parse_sysconfig_var()
self.update_modules()
def parse_sysconfig_var(self):
def parse_sysconfig_var(self) -> None:
""" Parses Apache CLI options from Gentoo configuration file """
defines = apache_util.parse_define_file(self.apacheconfig_filep,
"APACHE2_OPTS")
for k, v in defines.items():
self.variables[k] = v
def update_modules(self):
def update_modules(self) -> None:
"""Get loaded modules from httpd process, and add them to DOM"""
mod_cmd = [self.configurator.options.ctl, "modules"]
matches = apache_util.parse_from_subprocess(mod_cmd, r"(.*)_module")

View File

@@ -3,15 +3,26 @@ import copy
import fnmatch
import logging
import re
from typing import Collection
from typing import Dict
from typing import Iterable
from typing import List
from typing import Mapping
from typing import Optional
from typing import Pattern
from typing import Set
from typing import TYPE_CHECKING
from typing import Tuple
from typing import Union
from certbot import errors
from certbot.compat import os
from certbot_apache._internal import apache_util
from certbot_apache._internal import constants
if TYPE_CHECKING:
from certbot_apache._internal.configurator import ApacheConfigurator # pragma: no cover
try:
from augeas import Augeas
except ImportError: # pragma: no cover
@@ -32,11 +43,11 @@ class ApacheParser:
default - user config file, name - NameVirtualHost,
"""
arg_var_interpreter = re.compile(r"\$\{[^ \}]*}")
fnmatch_chars = {"*", "?", "\\", "[", "]"}
arg_var_interpreter: Pattern = re.compile(r"\$\{[^ \}]*}")
fnmatch_chars: Set[str] = {"*", "?", "\\", "[", "]"}
def __init__(self, root, vhostroot=None, version=(2, 4),
configurator=None):
def __init__(self, root: str, configurator: "ApacheConfigurator",
vhostroot: str, version: Tuple[int, ...] = (2, 4)) -> None:
# Note: Order is important here.
# Needed for calling save() with reverter functionality that resides in
@@ -45,7 +56,7 @@ class ApacheParser:
self.configurator = configurator
# Initialize augeas
self.aug = init_augeas()
self.aug: Augeas = init_augeas()
if not self.check_aug_version():
raise errors.NotSupportedError(
@@ -58,8 +69,8 @@ class ApacheParser:
self.variables: Dict[str, str] = {}
# Find configuration root and make sure augeas can parse it.
self.root = os.path.abspath(root)
self.loc = {"root": self._find_config_root()}
self.root: str = os.path.abspath(root)
self.loc: Dict[str, str] = {"root": self._find_config_root()}
self.parse_file(self.loc["root"])
if version >= (2, 4):
@@ -88,7 +99,7 @@ class ApacheParser:
if self.find_dir("Define", exclude=False):
raise errors.PluginError("Error parsing runtime variables")
def check_parsing_errors(self, lens):
def check_parsing_errors(self, lens: str) -> None:
"""Verify Augeas can parse all of the lens files.
:param str lens: lens to check for errors
@@ -114,7 +125,7 @@ class ApacheParser:
self.aug.get(path + "/message")))
raise errors.PluginError(msg)
def check_aug_version(self):
def check_aug_version(self) -> bool:
""" Checks that we have recent enough version of libaugeas.
If augeas version is recent enough, it will support case insensitive
regexp matching"""
@@ -129,7 +140,7 @@ class ApacheParser:
self.aug.remove("/test/path")
return matches
def unsaved_files(self):
def unsaved_files(self) -> Set[str]:
"""Lists files that have modified Augeas DOM but the changes have not
been written to the filesystem yet, used by `self.save()` and
ApacheConfigurator to check the file state.
@@ -168,7 +179,7 @@ class ApacheParser:
save_files.add(self.aug.get(path)[6:])
return save_files
def ensure_augeas_state(self):
def ensure_augeas_state(self) -> None:
"""Makes sure that all Augeas dom changes are written to files to avoid
loss of configuration directives when doing additional augeas parsing,
causing a possible augeas.load() resulting dom reset
@@ -178,7 +189,7 @@ class ApacheParser:
self.configurator.save_notes += "(autosave)"
self.configurator.save()
def save(self, save_files):
def save(self, save_files: Iterable[str]) -> None:
"""Saves all changes to the configuration files.
save() is called from ApacheConfigurator to handle the parser specific
@@ -197,7 +208,7 @@ class ApacheParser:
self.aug.remove("/files/"+sf)
self.aug.load()
def _log_save_errors(self, ex_errs):
def _log_save_errors(self, ex_errs: List[str]) -> None:
"""Log errors due to bad Augeas save.
:param list ex_errs: Existing errors before save
@@ -211,7 +222,7 @@ class ApacheParser:
# Only new errors caused by recent save
if err not in ex_errs), self.configurator.save_notes)
def add_include(self, main_config, inc_path):
def add_include(self, main_config: str, inc_path: str) -> None:
"""Add Include for a new configuration file if one does not exist
:param str main_config: file path to main Apache config file
@@ -230,21 +241,21 @@ class ApacheParser:
new_file = os.path.basename(inc_path)
self.existing_paths.setdefault(new_dir, []).append(new_file)
def add_mod(self, mod_name):
def add_mod(self, mod_name: str) -> None:
"""Shortcut for updating parser modules."""
if mod_name + "_module" not in self.modules:
self.modules[mod_name + "_module"] = None
if "mod_" + mod_name + ".c" not in self.modules:
self.modules["mod_" + mod_name + ".c"] = None
def reset_modules(self):
def reset_modules(self) -> None:
"""Reset the loaded modules list. This is called from cleanup to clear
temporarily loaded modules."""
self.modules = {}
self.update_modules()
self.parse_modules()
def parse_modules(self):
def parse_modules(self) -> None:
"""Iterates on the configuration until no new modules are loaded.
..todo:: This should be attempted to be done with a binary to avoid
@@ -272,19 +283,18 @@ class ApacheParser:
match_name[6:])
self.modules.update(mods)
def update_runtime_variables(self):
def update_runtime_variables(self) -> None:
"""Update Includes, Defines and Includes from httpd config dump data"""
self.update_defines()
self.update_includes()
self.update_modules()
def update_defines(self):
def update_defines(self) -> None:
"""Updates the dictionary of known variables in the configuration"""
self.variables = apache_util.parse_defines(self.configurator.options.ctl)
def update_includes(self):
def update_includes(self) -> None:
"""Get includes from httpd process, and add them to DOM if needed"""
# Find_dir iterates over configuration for Include and IncludeOptional
@@ -298,28 +308,28 @@ class ApacheParser:
if not self.parsed_in_current(i):
self.parse_file(i)
def update_modules(self):
def update_modules(self) -> None:
"""Get loaded modules from httpd process, and add them to DOM"""
matches = apache_util.parse_modules(self.configurator.options.ctl)
for mod in matches:
self.add_mod(mod.strip())
def filter_args_num(self, matches, args):
def filter_args_num(self, matches: str, args: int) -> List[str]:
"""Filter out directives with specific number of arguments.
This function makes the assumption that all related arguments are given
in order. Thus /files/apache/directive[5]/arg[2] must come immediately
after /files/apache/directive[5]/arg[1]. Runs in 1 linear pass.
:param string matches: Matches of all directives with arg nodes
:param str matches: Matches of all directives with arg nodes
:param int args: Number of args you would like to filter
:returns: List of directives that contain # of arguments.
(arg is stripped off)
"""
filtered = []
filtered: List[str] = []
if args == 1:
for i, match in enumerate(matches):
if match.endswith("/arg"):
@@ -336,7 +346,7 @@ class ApacheParser:
return filtered
def add_dir_to_ifmodssl(self, aug_conf_path, directive, args):
def add_dir_to_ifmodssl(self, aug_conf_path: str, directive: str, args: List[str]) -> None:
"""Adds directive and value to IfMod ssl block.
Adds given directive and value along configuration path within
@@ -362,7 +372,7 @@ class ApacheParser:
for i, arg in enumerate(args):
self.aug.set("%s/arg[%d]" % (nvh_path, i + 1), arg)
def get_ifmod(self, aug_conf_path, mod, beginning=False):
def get_ifmod(self, aug_conf_path: str, mod: str, beginning: bool = False) -> str:
"""Returns the path to <IfMod mod> and creates one if it doesn't exist.
:param str aug_conf_path: Augeas configuration path
@@ -384,7 +394,7 @@ class ApacheParser:
# Strip off "arg" at end of first ifmod path
return if_mods[0].rpartition("arg")[0]
def create_ifmod(self, aug_conf_path, mod, beginning=False):
def create_ifmod(self, aug_conf_path: str, mod: str, beginning: bool = False) -> str:
"""Creates a new <IfMod mod> and returns its path.
:param str aug_conf_path: Augeas configuration path
@@ -411,7 +421,9 @@ class ApacheParser:
self.aug.set(c_path_arg, mod)
return retpath
def add_dir(self, aug_conf_path, directive, args):
def add_dir(
self, aug_conf_path: str, directive: Optional[str], args: Union[List[str], str]
) -> None:
"""Appends directive to the end fo the file given by aug_conf_path.
.. note:: Not added to AugeasConfigurator because it may depend
@@ -431,7 +443,8 @@ class ApacheParser:
else:
self.aug.set(aug_conf_path + "/directive[last()]/arg", args)
def add_dir_beginning(self, aug_conf_path, dirname, args):
def add_dir_beginning(self, aug_conf_path: str, dirname: str,
args: Union[List[str], str]) -> None:
"""Adds the directive to the beginning of defined aug_conf_path.
:param str aug_conf_path: Augeas configuration path to add directive
@@ -452,7 +465,7 @@ class ApacheParser:
else:
self.aug.set(first_dir + "/arg", args)
def add_comment(self, aug_conf_path, comment):
def add_comment(self, aug_conf_path: str, comment: str) -> None:
"""Adds the comment to the augeas path
:param str aug_conf_path: Augeas configuration path to add directive
@@ -461,7 +474,7 @@ class ApacheParser:
"""
self.aug.set(aug_conf_path + "/#comment[last() + 1]", comment)
def find_comments(self, arg, start=None):
def find_comments(self, arg: str, start: Optional[str] = None) -> List[str]:
"""Finds a comment with specified content from the provided DOM path
:param str arg: Comment content to search
@@ -483,7 +496,8 @@ class ApacheParser:
results.append(comment)
return results
def find_dir(self, directive, arg=None, start=None, exclude=True):
def find_dir(self, directive: str, arg: Optional[str] = None,
start: Optional[str] = None, exclude: bool = True) -> List[str]:
"""Finds directive in the configuration.
Recursively searches through config files to find directives
@@ -511,6 +525,8 @@ class ApacheParser:
:param bool exclude: Whether or not to exclude directives based on
variables and enabled modules
:rtype list
"""
# Cannot place member variable in the definition of the function so...
if not start:
@@ -559,7 +575,7 @@ class ApacheParser:
return ordered_matches
def get_all_args(self, match):
def get_all_args(self, match: str) -> List[Optional[str]]:
"""
Tries to fetch all arguments for a directive. See get_arg.
@@ -573,7 +589,7 @@ class ApacheParser:
allargs = self.aug.match(match + '*')
return [self.get_arg(arg) for arg in allargs]
def get_arg(self, match):
def get_arg(self, match: Optional[str]) -> Optional[str]:
"""Uses augeas.get to get argument value and interprets result.
This also converts all variables and parameters appropriately.
@@ -588,6 +604,7 @@ class ApacheParser:
# e.g. strip now, not later
if not value:
return None
value = value.strip("'\"")
variables = ApacheParser.arg_var_interpreter.findall(value)
@@ -601,13 +618,13 @@ class ApacheParser:
return value
def get_root_augpath(self):
def get_root_augpath(self) -> str:
"""
Returns the Augeas path of root configuration.
"""
return get_aug_path(self.loc["root"])
def exclude_dirs(self, matches):
def exclude_dirs(self, matches: Iterable[str]) -> List[str]:
"""Exclude directives that are not loaded into the configuration."""
filters = [("ifmodule", self.modules.keys()), ("ifdefine", self.variables)]
@@ -621,7 +638,7 @@ class ApacheParser:
valid_matches.append(match)
return valid_matches
def _pass_filter(self, match, filter_):
def _pass_filter(self, match: str, filter_: Tuple[str, Collection[str]]) -> bool:
"""Determine if directive passes a filter.
:param str match: Augeas path
@@ -650,7 +667,7 @@ class ApacheParser:
return True
def standard_path_from_server_root(self, arg):
def standard_path_from_server_root(self, arg: str) -> str:
"""Ensure paths are consistent and absolute
:param str arg: Argument of directive
@@ -669,7 +686,7 @@ class ApacheParser:
arg = os.path.normpath(arg)
return arg
def _get_include_path(self, arg):
def _get_include_path(self, arg: Optional[str]) -> Optional[str]:
"""Converts an Apache Include directive into Augeas path.
Converts an Apache Include directive argument into an Augeas
@@ -689,6 +706,8 @@ class ApacheParser:
# if matchObj.group() != arg:
# logger.error("Error: Invalid regexp characters in %s", arg)
# return []
if arg is None:
return None # pragma: no cover
arg = self.standard_path_from_server_root(arg)
# Attempts to add a transform to the file if one does not already exist
@@ -713,7 +732,7 @@ class ApacheParser:
return get_aug_path(arg)
def fnmatch_to_re(self, clean_fn_match):
def fnmatch_to_re(self, clean_fn_match: str) -> str:
"""Method converts Apache's basic fnmatch to regular expression.
Assumption - Configs are assumed to be well-formed and only writable by
@@ -730,7 +749,7 @@ class ApacheParser:
# Since Python 3.6, it returns a different pattern like (?s:.*\.load)\Z
return fnmatch.translate(clean_fn_match)[4:-3] # pragma: no cover
def parse_file(self, filepath):
def parse_file(self, filepath: str) -> None:
"""Parse file with Augeas
Checks to see if file_path is parsed by Augeas
@@ -757,7 +776,7 @@ class ApacheParser:
self._add_httpd_transform(filepath)
self.aug.load()
def parsed_in_current(self, filep):
def parsed_in_current(self, filep: Optional[str]) -> bool:
"""Checks if the file path is parsed by current Augeas parser config
ie. returns True if the file is found on a path that's found in live
Augeas configuration.
@@ -767,9 +786,11 @@ class ApacheParser:
:returns: True if file is parsed in existing configuration tree
:rtype: bool
"""
if not filep:
return False # pragma: no cover
return self._parsed_by_parser_paths(filep, self.parser_paths)
def parsed_in_original(self, filep):
def parsed_in_original(self, filep: Optional[str]) -> bool:
"""Checks if the file path is parsed by existing Apache config.
ie. returns True if the file is found on a path that matches Include or
IncludeOptional statement in the Apache configuration.
@@ -779,9 +800,11 @@ class ApacheParser:
:returns: True if file is parsed in existing configuration tree
:rtype: bool
"""
if not filep:
return False # pragma: no cover
return self._parsed_by_parser_paths(filep, self.existing_paths)
def _parsed_by_parser_paths(self, filep, paths):
def _parsed_by_parser_paths(self, filep: str, paths: Mapping[str, List[str]]) -> bool:
"""Helper function that searches through provided paths and returns
True if file path is found in the set"""
for directory in paths:
@@ -790,7 +813,7 @@ class ApacheParser:
return True
return False
def _check_path_actions(self, filepath):
def _check_path_actions(self, filepath: str) -> Tuple[bool, bool]:
"""Determine actions to take with a new augeas path
This helper function will return a tuple that defines
@@ -815,7 +838,7 @@ class ApacheParser:
remove_old = False
return use_new, remove_old
def _remove_httpd_transform(self, filepath):
def _remove_httpd_transform(self, filepath: str) -> None:
"""Remove path from Augeas transform
:param str filepath: filepath to remove
@@ -830,7 +853,7 @@ class ApacheParser:
self.aug.remove(remove_inc[0])
self.parser_paths.pop(remove_dirname)
def _add_httpd_transform(self, incl):
def _add_httpd_transform(self, incl: str) -> None:
"""Add a transform to Augeas.
This function will correctly add a transform to augeas
@@ -840,7 +863,7 @@ class ApacheParser:
:param str incl: filepath to include for transform
"""
last_include = self.aug.match("/augeas/load/Httpd/incl [last()]")
last_include: str = self.aug.match("/augeas/load/Httpd/incl [last()]")
if last_include:
# Insert a new node immediately after the last incl
self.aug.insert(last_include[0], "incl", False)
@@ -858,7 +881,7 @@ class ApacheParser:
self.parser_paths[os.path.dirname(incl)] = [
os.path.basename(incl)]
def standardize_excl(self):
def standardize_excl(self) -> None:
"""Standardize the excl arguments for the Httpd lens in Augeas.
Note: Hack!
@@ -890,16 +913,16 @@ class ApacheParser:
self.aug.load()
def _set_locations(self):
def _set_locations(self) -> Dict[str, str]:
"""Set default location for directives.
Locations are given as file_paths
.. todo:: Make sure that files are included
"""
default = self.loc["root"]
default: str = self.loc["root"]
temp = os.path.join(self.root, "ports.conf")
temp: str = os.path.join(self.root, "ports.conf")
if os.path.isfile(temp):
listen = temp
name = temp
@@ -909,7 +932,7 @@ class ApacheParser:
return {"default": default, "listen": listen, "name": name}
def _find_config_root(self):
def _find_config_root(self) -> str:
"""Find the Apache Configuration Root file."""
location = ["apache2.conf", "httpd.conf", "conf/httpd.conf"]
for name in location:
@@ -918,7 +941,7 @@ class ApacheParser:
raise errors.NoInstallationError("Could not find configuration root")
def case_i(string):
def case_i(string: str) -> str:
"""Returns case insensitive regex.
Returns a sloppy, but necessary version of a case insensitive regex.
@@ -934,7 +957,7 @@ def case_i(string):
if c.isalpha() else c for c in re.escape(string))
def get_aug_path(file_path):
def get_aug_path(file_path: str) -> str:
"""Return augeas path for full filepath.
:param str file_path: Full filepath

View File

@@ -1,7 +1,12 @@
"""ParserNode utils"""
from typing import Dict
from typing import Any
from typing import List
from typing import Optional
from typing import Tuple
def validate_kwargs(kwargs, required_names):
def validate_kwargs(kwargs: Dict[str, Any], required_names: List[str]) -> Dict[str, Any]:
"""
Ensures that the kwargs dict has all the expected values. This function modifies
the kwargs dictionary, and hence the returned dictionary should be used instead
@@ -11,7 +16,7 @@ def validate_kwargs(kwargs, required_names):
:param list required_names: List of required parameter names.
"""
validated_kwargs = {}
validated_kwargs: Dict[str, Any] = {}
for name in required_names:
try:
validated_kwargs[name] = kwargs.pop(name)
@@ -25,7 +30,7 @@ def validate_kwargs(kwargs, required_names):
return validated_kwargs
def parsernode_kwargs(kwargs):
def parsernode_kwargs(kwargs: Dict[str, Any]) -> Tuple[Any, ...]:
"""
Validates keyword arguments for ParserNode. This function modifies the kwargs
dictionary, and hence the returned dictionary should be used instead in the
@@ -55,7 +60,7 @@ def parsernode_kwargs(kwargs):
return kwargs["ancestor"], kwargs["dirty"], kwargs["filepath"], kwargs["metadata"]
def commentnode_kwargs(kwargs):
def commentnode_kwargs(kwargs: Dict[str, Any]) -> Tuple[Optional[str], Dict[str, str]]:
"""
Validates keyword arguments for CommentNode and sets the default values for
optional kwargs. This function modifies the kwargs dictionary, and hence the
@@ -90,7 +95,7 @@ def commentnode_kwargs(kwargs):
return comment, kwargs
def directivenode_kwargs(kwargs):
def directivenode_kwargs(kwargs: Dict[str, Any]) -> Tuple[Any, Any, Any, Dict]:
"""
Validates keyword arguments for DirectiveNode and BlockNode and sets the
default values for optional kwargs. This function modifies the kwargs

View File

@@ -345,8 +345,8 @@ class ParserInitTest(util.ApacheTest):
self.config.config_test = mock.Mock()
self.assertRaises(
errors.NoInstallationError, ApacheParser,
os.path.relpath(self.config_path), "/dummy/vhostpath",
version=(2, 4, 22), configurator=self.config)
os.path.relpath(self.config_path), self.config,
"/dummy/vhostpath", version=(2, 4, 22))
def test_init_old_aug(self):
from certbot_apache._internal.parser import ApacheParser
@@ -354,8 +354,8 @@ class ParserInitTest(util.ApacheTest):
mock_c.return_value = False
self.assertRaises(
errors.NotSupportedError,
ApacheParser, os.path.relpath(self.config_path),
"/dummy/vhostpath", version=(2, 4, 22), configurator=self.config)
ApacheParser, os.path.relpath(self.config_path), self.config,
"/dummy/vhostpath", version=(2, 4, 22))
@mock.patch("certbot_apache._internal.apache_util._get_runtime_cfg")
def test_unparseable(self, mock_cfg):
@@ -363,8 +363,8 @@ class ParserInitTest(util.ApacheTest):
mock_cfg.return_value = ('Define: TEST')
self.assertRaises(
errors.PluginError,
ApacheParser, os.path.relpath(self.config_path),
"/dummy/vhostpath", version=(2, 2, 22), configurator=self.config)
ApacheParser, os.path.relpath(self.config_path), self.config,
"/dummy/vhostpath", version=(2, 2, 22))
def test_root_normalized(self):
from certbot_apache._internal.parser import ApacheParser
@@ -375,7 +375,7 @@ class ParserInitTest(util.ApacheTest):
self.temp_dir,
"debian_apache_2_4/////multiple_vhosts/../multiple_vhosts/apache2")
parser = ApacheParser(path, "/dummy/vhostpath", configurator=self.config)
parser = ApacheParser(path, self.config, "/dummy/vhostpath")
self.assertEqual(parser.root, self.config_path)
@@ -384,8 +384,7 @@ class ParserInitTest(util.ApacheTest):
with mock.patch("certbot_apache._internal.parser.ApacheParser."
"update_runtime_variables"):
parser = ApacheParser(
os.path.relpath(self.config_path),
"/dummy/vhostpath", configurator=self.config)
os.path.relpath(self.config_path), self.config, "/dummy/vhostpath")
self.assertEqual(parser.root, self.config_path)
@@ -394,8 +393,7 @@ class ParserInitTest(util.ApacheTest):
with mock.patch("certbot_apache._internal.parser.ApacheParser."
"update_runtime_variables"):
parser = ApacheParser(
self.config_path + os.path.sep,
"/dummy/vhostpath", configurator=self.config)
self.config_path + os.path.sep, self.config, "/dummy/vhostpath")
self.assertEqual(parser.root, self.config_path)

View File

@@ -73,7 +73,7 @@ class ParserTest(ApacheTest):
with mock.patch("certbot_apache._internal.parser.ApacheParser."
"update_runtime_variables"):
self.parser = ApacheParser(
self.config_path, self.vhost_path, configurator=self.config)
self.config_path, self.config, self.vhost_path)
def get_apache_configurator(

View File

@@ -8,11 +8,11 @@ import tempfile
from typing import Iterable
from typing import List
from typing import Optional
from typing import Union
from typing import overload
from typing import Set
from typing import Tuple
from typing import Type
from typing import Union
from certbot_compatibility_test import errors
from certbot_compatibility_test import interfaces

View File

@@ -10,7 +10,6 @@ from typing import IO
from typing import List
from typing import Optional
from typing import Tuple
from typing import Union
import warnings
from cryptography.hazmat.backends import default_backend
@@ -573,6 +572,7 @@ class Client:
:param list domains: list of domains to install the certificate
:param str privkey_path: path to certificate private key
:param str cert_path: certificate file path (optional)
:param str fullchain_path: path to the full chain of the certificate
:param str chain_path: chain file path
"""
@@ -643,13 +643,13 @@ class Client:
"Option %s is not supported by the selected installer. "
"Skipping enhancement.", config_name)
msg = ("We were unable to restart web server")
msg = "We were unable to restart web server"
if enhanced:
with error_handler.ErrorHandler(self._rollback_and_restart, msg):
self.installer.restart()
def apply_enhancement(self, domains: List[str], enhancement: str,
options: Optional[Union[List[str], str]] = None) -> None:
options: Optional[str] = None) -> None:
"""Applies an enhancement on all domains.
:param list domains: list of ssl_vhosts (as strings)

View File

@@ -5,13 +5,13 @@ from argparse import ArgumentParser
import sys
from types import ModuleType
from typing import Any
from typing import Union
from typing import cast
from typing import Iterable
from typing import List
from typing import Optional
from typing import Type
from typing import TYPE_CHECKING
from typing import Union
import warnings
import zope.interface

View File

@@ -12,6 +12,8 @@ from typing import Iterable
from typing import List
from typing import Optional
from typing import Set
from typing import Type
from typing import TypeVar
from typing import Tuple
import pkg_resources
@@ -244,6 +246,9 @@ class Configurator(Installer, interfaces.Authenticator, metaclass=ABCMeta):
"""
GenericAddr = TypeVar("GenericAddr", bound="Addr")
class Addr:
r"""Represents an virtual host address.
@@ -256,7 +261,7 @@ class Addr:
self.ipv6 = ipv6
@classmethod
def fromstring(cls, str_addr: str) -> Optional['Addr']:
def fromstring(cls: Type[GenericAddr], str_addr: str) -> Optional[GenericAddr]:
"""Initialize Addr from string."""
if str_addr.startswith('['):
# ipv6 addresses starts with [
@@ -301,7 +306,7 @@ class Addr:
"""Return port."""
return self.tup[1]
def get_addr_obj(self, port: str) -> 'Addr':
def get_addr_obj(self: GenericAddr, port: str) -> GenericAddr:
"""Return new address object with same addr and new port."""
return self.__class__((self.tup[0], port), self.ipv6)

View File

@@ -9,12 +9,12 @@ import sys
import tempfile
from typing import Any
from typing import Callable
from typing import Union
from typing import cast
from typing import IO
from typing import Iterable
from typing import List
from typing import Optional
from typing import Union
import unittest
import warnings