Compare commits
13 Commits
test-cento
...
dbm_test
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c63607b7f9 | ||
|
|
85187ea90d | ||
|
|
506eac2189 | ||
|
|
899bb15b61 | ||
|
|
35c0d79390 | ||
|
|
ba9de53768 | ||
|
|
e4834da2c1 | ||
|
|
34899c1c14 | ||
|
|
ff5a8a83b4 | ||
|
|
9366633ae6 | ||
|
|
cec96a523c | ||
|
|
c20c722acf | ||
|
|
118bf1d930 |
@@ -1,7 +1,11 @@
|
|||||||
""" Utility functions for certbot-apache plugin """
|
""" Utility functions for certbot-apache plugin """
|
||||||
import binascii
|
import binascii
|
||||||
import os
|
import os
|
||||||
|
import six
|
||||||
|
import struct
|
||||||
|
import time
|
||||||
|
|
||||||
|
from certbot import crypto_util
|
||||||
from certbot import util
|
from certbot import util
|
||||||
|
|
||||||
def get_mod_deps(mod_name):
|
def get_mod_deps(mod_name):
|
||||||
@@ -104,3 +108,47 @@ def parse_define_file(filepath, varname):
|
|||||||
def unique_id():
|
def unique_id():
|
||||||
""" Returns an unique id to be used as a VirtualHost identifier"""
|
""" Returns an unique id to be used as a VirtualHost identifier"""
|
||||||
return binascii.hexlify(os.urandom(16)).decode("utf-8")
|
return binascii.hexlify(os.urandom(16)).decode("utf-8")
|
||||||
|
|
||||||
|
|
||||||
|
def get_apache_ocsp_struct(ttl, ocsp_response):
|
||||||
|
"""Create Apache OCSP response structure to be used in response cache
|
||||||
|
|
||||||
|
:param int ttl: Time-To-Live in seocnds
|
||||||
|
:param str ocsp_response: OCSP response data
|
||||||
|
|
||||||
|
:returns: Apache OCSP structure
|
||||||
|
:rtype: `str`
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
ttl = time.time() + ttl
|
||||||
|
# As microseconds
|
||||||
|
ttl_struct = struct.pack('l', int(ttl*1000000))
|
||||||
|
return b'\x01'.join([ttl_struct, ocsp_response])
|
||||||
|
|
||||||
|
|
||||||
|
def certid_sha1_hex(cert_path):
|
||||||
|
"""Hex representation of certificate SHA1 fingerprint
|
||||||
|
|
||||||
|
:param str cert_path: File path to certificate
|
||||||
|
|
||||||
|
:returns: Hex representation SHA1 fingerprint of certificate
|
||||||
|
:rtype: `str`
|
||||||
|
|
||||||
|
"""
|
||||||
|
sha1_hex = binascii.hexlify(certid_sha1(cert_path))
|
||||||
|
if isinstance(sha1_hex, six.binary_type):
|
||||||
|
return sha1_hex.decode('utf-8') # pragma: no cover
|
||||||
|
return sha1_hex # pragma: no cover
|
||||||
|
|
||||||
|
|
||||||
|
def certid_sha1(cert_path):
|
||||||
|
"""SHA1 fingerprint of certificate
|
||||||
|
|
||||||
|
:param str cert_path: File path to certificate
|
||||||
|
|
||||||
|
:returns: SHA1 fingerprint bytestring
|
||||||
|
:rtype: `str`
|
||||||
|
|
||||||
|
"""
|
||||||
|
return crypto_util.cert_sha1_fingerprint(cert_path)
|
||||||
|
|||||||
@@ -1,11 +1,13 @@
|
|||||||
"""Apache Configuration based off of Augeas Configurator."""
|
"""Apache Configuration based off of Augeas Configurator."""
|
||||||
# pylint: disable=too-many-lines
|
# pylint: disable=too-many-lines
|
||||||
|
import dbm
|
||||||
import copy
|
import copy
|
||||||
import fnmatch
|
import fnmatch
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import pkg_resources
|
import pkg_resources
|
||||||
import re
|
import re
|
||||||
|
import shutil
|
||||||
import six
|
import six
|
||||||
import socket
|
import socket
|
||||||
import time
|
import time
|
||||||
@@ -18,12 +20,13 @@ from acme.magic_typing import Any, DefaultDict, Dict, List, Set, Union # pylint
|
|||||||
|
|
||||||
from certbot import errors
|
from certbot import errors
|
||||||
from certbot import interfaces
|
from certbot import interfaces
|
||||||
|
from certbot import ocsp
|
||||||
from certbot import util
|
from certbot import util
|
||||||
|
|
||||||
from certbot.achallenges import KeyAuthorizationAnnotatedChallenge # pylint: disable=unused-import
|
from certbot.achallenges import KeyAuthorizationAnnotatedChallenge # pylint: disable=unused-import
|
||||||
from certbot.plugins import common
|
from certbot.plugins import common
|
||||||
from certbot.plugins.util import path_surgery
|
from certbot.plugins.util import path_surgery
|
||||||
from certbot.plugins.enhancements import AutoHSTSEnhancement
|
from certbot.plugins.enhancements import AutoHSTSEnhancement, OCSPPrefetchEnhancement
|
||||||
|
|
||||||
from certbot_apache import apache_util
|
from certbot_apache import apache_util
|
||||||
from certbot_apache import augeas_configurator
|
from certbot_apache import augeas_configurator
|
||||||
@@ -36,6 +39,12 @@ from certbot_apache import tls_sni_01
|
|||||||
|
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
|
|
||||||
|
try:
|
||||||
|
import dbm.ndbm as dbm # pragma: no cover
|
||||||
|
except ImportError: # pragma: no cover
|
||||||
|
# dbm.ndbm only available on Python3
|
||||||
|
import dbm # pragma: no cover
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
@@ -188,6 +197,8 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
|
|||||||
self._enhanced_vhosts = defaultdict(set) # type: DefaultDict[str, Set[obj.VirtualHost]]
|
self._enhanced_vhosts = defaultdict(set) # type: DefaultDict[str, Set[obj.VirtualHost]]
|
||||||
# Temporary state for AutoHSTS enhancement
|
# Temporary state for AutoHSTS enhancement
|
||||||
self._autohsts = {} # type: Dict[str, Dict[str, Union[int, float]]]
|
self._autohsts = {} # type: Dict[str, Dict[str, Union[int, float]]]
|
||||||
|
self._ocsp_prefetch = {} # type: Dict[str, str]
|
||||||
|
self._ocsp_dbm_bsddb = False
|
||||||
|
|
||||||
# These will be set in the prepare function
|
# These will be set in the prepare function
|
||||||
self._prepared = False
|
self._prepared = False
|
||||||
@@ -1692,7 +1703,7 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
|
|||||||
self.parser.find_dir("SSLCertificateKeyFile",
|
self.parser.find_dir("SSLCertificateKeyFile",
|
||||||
lineage.key_path, vhost.path))
|
lineage.key_path, vhost.path))
|
||||||
|
|
||||||
def _enable_ocsp_stapling(self, ssl_vhost, unused_options):
|
def _enable_ocsp_stapling(self, ssl_vhost, unused_options, prefetch=False):
|
||||||
"""Enables OCSP Stapling
|
"""Enables OCSP Stapling
|
||||||
|
|
||||||
In OCSP, each client (e.g. browser) would have to query the
|
In OCSP, each client (e.g. browser) would have to query the
|
||||||
@@ -1713,8 +1724,8 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
|
|||||||
:param unused_options: Not currently used
|
:param unused_options: Not currently used
|
||||||
:type unused_options: Not Available
|
:type unused_options: Not Available
|
||||||
|
|
||||||
:returns: Success, general_vhost (HTTP vhost)
|
:param prefetch: Use OCSP prefetching
|
||||||
:rtype: (bool, :class:`~certbot_apache.obj.VirtualHost`)
|
:type prefetch: bool
|
||||||
|
|
||||||
"""
|
"""
|
||||||
min_apache_ver = (2, 3, 3)
|
min_apache_ver = (2, 3, 3)
|
||||||
@@ -1723,8 +1734,15 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
|
|||||||
"Unable to set OCSP directives.\n"
|
"Unable to set OCSP directives.\n"
|
||||||
"Apache version is below 2.3.3.")
|
"Apache version is below 2.3.3.")
|
||||||
|
|
||||||
if "socache_shmcb_module" not in self.parser.modules:
|
if prefetch:
|
||||||
self.enable_mod("socache_shmcb")
|
if "socache_dbm_module" not in self.parser.modules:
|
||||||
|
self.enable_mod("socache_dbm")
|
||||||
|
cache_path = os.path.join(self.config.config_dir, "ocsp", "ocsp_cache.db")
|
||||||
|
cache_dir = ["dbm:"+cache_path]
|
||||||
|
else:
|
||||||
|
if "socache_shmcb_module" not in self.parser.modules:
|
||||||
|
self.enable_mod("socache_shmcb")
|
||||||
|
cache_dir = ["shmcb:/var/run/apache2/stapling_cache(128000)"]
|
||||||
|
|
||||||
# Check if there's an existing SSLUseStapling directive on.
|
# Check if there's an existing SSLUseStapling directive on.
|
||||||
use_stapling_aug_path = self.parser.find_dir("SSLUseStapling",
|
use_stapling_aug_path = self.parser.find_dir("SSLUseStapling",
|
||||||
@@ -1745,8 +1763,7 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
|
|||||||
re.sub(r"/\w*$", "", stapling_cache_aug_path[0]))
|
re.sub(r"/\w*$", "", stapling_cache_aug_path[0]))
|
||||||
|
|
||||||
self.parser.add_dir_to_ifmodssl(ssl_vhost_aug_path,
|
self.parser.add_dir_to_ifmodssl(ssl_vhost_aug_path,
|
||||||
"SSLStaplingCache",
|
"SSLStaplingCache", cache_dir)
|
||||||
["shmcb:/var/run/apache2/stapling_cache(128000)"])
|
|
||||||
|
|
||||||
msg = "OCSP Stapling was enabled on SSL Vhost: %s.\n"%(
|
msg = "OCSP Stapling was enabled on SSL Vhost: %s.\n"%(
|
||||||
ssl_vhost.filep)
|
ssl_vhost.filep)
|
||||||
@@ -2171,7 +2188,17 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
|
|||||||
|
|
||||||
"""
|
"""
|
||||||
self.config_test()
|
self.config_test()
|
||||||
|
|
||||||
|
if not self._ocsp_prefetch:
|
||||||
|
# Try to populate OCSP prefetch structure from pluginstorage
|
||||||
|
self._ocsp_prefetch_fetch_state()
|
||||||
|
if self._ocsp_prefetch:
|
||||||
|
# OCSP prefetching is enabled, so back up the db
|
||||||
|
self._ocsp_prefetch_backup_db()
|
||||||
self._reload()
|
self._reload()
|
||||||
|
if self._ocsp_prefetch:
|
||||||
|
# Restore the backed up dbm database
|
||||||
|
self._ocsp_prefetch_restore_db()
|
||||||
|
|
||||||
def _reload(self):
|
def _reload(self):
|
||||||
"""Reloads the Apache server.
|
"""Reloads the Apache server.
|
||||||
@@ -2494,5 +2521,201 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
|
|||||||
# Update AutoHSTS storage (We potentially removed vhosts from managed)
|
# Update AutoHSTS storage (We potentially removed vhosts from managed)
|
||||||
self._autohsts_save_state()
|
self._autohsts_save_state()
|
||||||
|
|
||||||
|
def _ensure_ocsp_dirs(self):
|
||||||
|
"""Makes sure that the OCSP directory paths exist."""
|
||||||
|
ocsp_work = os.path.join(self.config.work_dir, "ocsp")
|
||||||
|
ocsp_save = os.path.join(self.config.config_dir, "ocsp")
|
||||||
|
for path in [ocsp_work, ocsp_save]:
|
||||||
|
if not os.path.isdir(path):
|
||||||
|
os.makedirs(path)
|
||||||
|
os.chmod(path, 0o755)
|
||||||
|
|
||||||
|
def _ocsp_refresh_if_needed(self, pf_obj):
|
||||||
|
"""Refreshes OCSP response for a certiifcate if it's due
|
||||||
|
|
||||||
|
:param dict pf_obj: OCSP prefetch object from pluginstorage
|
||||||
|
|
||||||
|
:returns: If OCSP response was updated
|
||||||
|
:rtype: bool
|
||||||
|
|
||||||
|
"""
|
||||||
|
ttl = pf_obj["lastupdate"] + constants.OCSP_INTERNAL_TTL
|
||||||
|
if ttl < time.time():
|
||||||
|
self._ocsp_refresh(pf_obj["cert_path"], pf_obj["chain_path"])
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def _ocsp_refresh(self, cert_path, chain_path):
|
||||||
|
"""Refresh the OCSP response for a certificate
|
||||||
|
|
||||||
|
:param str cert_path: Filesystem path to certificate file
|
||||||
|
:param str chain_path: Filesystem path to certificate chain file
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
self._ensure_ocsp_dirs()
|
||||||
|
handler = ocsp.OCSPResponseHandler(cert_path, chain_path)
|
||||||
|
ocsp_workfile = os.path.join(
|
||||||
|
self.config.work_dir, "ocsp",
|
||||||
|
apache_util.certid_sha1_hex(cert_path))
|
||||||
|
if handler.ocsp_request_to_file(ocsp_workfile):
|
||||||
|
# Guaranteed good response
|
||||||
|
cache_path = os.path.join(self.config.config_dir, "ocsp", "ocsp_cache")
|
||||||
|
db = self._ocsp_dbm_open(cache_path)
|
||||||
|
cert_sha = apache_util.certid_sha1(cert_path)
|
||||||
|
db[cert_sha] = self._ocsp_response_dbm(ocsp_workfile)
|
||||||
|
self._ocsp_dbm_close(db)
|
||||||
|
else:
|
||||||
|
logger.warning("Encountered an issue while trying to prefetch OCSP "
|
||||||
|
"response for certificate: %s", cert_path)
|
||||||
|
|
||||||
|
def _ensure_ocsp_prefetch_compatibility(self):
|
||||||
|
"""Make sure that the operating system supports the required libraries
|
||||||
|
to manage Apache DBM files.
|
||||||
|
|
||||||
|
:raises: errors.NotSupportedError
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
import bsddb
|
||||||
|
except ImportError:
|
||||||
|
import dbm
|
||||||
|
if not hasattr(dbm, 'ndbm'):
|
||||||
|
msg = ("Unfortunately your operating system does not have a "
|
||||||
|
"compatible database module available for managing "
|
||||||
|
"Apache cache database.")
|
||||||
|
raise errors.NotSupportedError(msg)
|
||||||
|
|
||||||
|
def _ocsp_dbm_open(self, filepath):
|
||||||
|
"""Helper method to open an DBM file in a way that depends on the platform
|
||||||
|
that Certbot is run on. Returns an open database structure."""
|
||||||
|
try:
|
||||||
|
import bsddb
|
||||||
|
self._ocsp_dbm_bsddb = True
|
||||||
|
cache_path = filepath + ".db"
|
||||||
|
database = bsddb.hashopen(cache_path, 'c')
|
||||||
|
except ImportError:
|
||||||
|
# Python3 doesn't have bsddb module, so we use dbm.ndbm instead
|
||||||
|
import dbm
|
||||||
|
database = dbm.ndbm.open(filepath, 'c')
|
||||||
|
return database
|
||||||
|
|
||||||
|
def _ocsp_dbm_close(self, database):
|
||||||
|
"""Helper method to sync and close a DBM file, in a way required by the
|
||||||
|
used dbm implementation."""
|
||||||
|
if self._ocsp_dbm_bsddb:
|
||||||
|
database.sync()
|
||||||
|
database.close()
|
||||||
|
else:
|
||||||
|
database.close()
|
||||||
|
|
||||||
|
def _ocsp_response_dbm(self, workfile):
|
||||||
|
"""Creates a dbm entry for OCSP response data
|
||||||
|
|
||||||
|
:param str workfile: File path for raw OCSP response
|
||||||
|
|
||||||
|
:returns: OCSP response cache data that Apache can use
|
||||||
|
:rtype: string
|
||||||
|
"""
|
||||||
|
|
||||||
|
with open(workfile, 'rb') as fh:
|
||||||
|
response = fh.read()
|
||||||
|
ttl = constants.OCSP_APACHE_TTL
|
||||||
|
return apache_util.get_apache_ocsp_struct(ttl, response)
|
||||||
|
|
||||||
|
def _ocsp_prefetch_save(self, cert_path, chain_path):
|
||||||
|
"""Saves status of current OCSP prefetch, including the last update
|
||||||
|
time to determine if an update is needed on later run.
|
||||||
|
|
||||||
|
:param str cert_path: Filesystem path to certificate
|
||||||
|
:param str chain_path: Filesystem path to certificate chain file
|
||||||
|
|
||||||
|
"""
|
||||||
|
status = {
|
||||||
|
"lastupdate": time.time(),
|
||||||
|
"cert_path": cert_path,
|
||||||
|
"chain_path": chain_path
|
||||||
|
}
|
||||||
|
cert_id = apache_util.certid_sha1_hex(cert_path)
|
||||||
|
self._ocsp_prefetch[cert_id] = status
|
||||||
|
self.storage.put("ocsp_prefetch", self._ocsp_prefetch)
|
||||||
|
self.storage.save()
|
||||||
|
|
||||||
|
def _ocsp_prefetch_fetch_state(self):
|
||||||
|
"""
|
||||||
|
Populates the OCSP prefetch state from the pluginstorage.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
self._ocsp_prefetch = self.storage.fetch("ocsp_prefetch")
|
||||||
|
except KeyError:
|
||||||
|
self._ocsp_prefetch = dict()
|
||||||
|
|
||||||
|
def _ocsp_prefetch_backup_db(self):
|
||||||
|
"""
|
||||||
|
Copies the active dbm file to work directory.
|
||||||
|
"""
|
||||||
|
self._ensure_ocsp_dirs()
|
||||||
|
cache_path = os.path.join(self.config.config_dir, "ocsp", "ocsp_cache.db")
|
||||||
|
try:
|
||||||
|
shutil.copy2(cache_path, os.path.join(self.config.work_dir, "ocsp"))
|
||||||
|
except IOError:
|
||||||
|
logger.debug("Encountered an issue while trying to backup OCSP dbm file")
|
||||||
|
|
||||||
|
def _ocsp_prefetch_restore_db(self):
|
||||||
|
"""
|
||||||
|
Restores the active dbm file from work directory.
|
||||||
|
"""
|
||||||
|
self._ensure_ocsp_dirs()
|
||||||
|
cache_path = os.path.join(self.config.config_dir, "ocsp", "ocsp_cache.db")
|
||||||
|
work_file_path = os.path.join(self.config.work_dir, "ocsp", "ocsp_cache.db")
|
||||||
|
try:
|
||||||
|
shutil.copy2(work_file_path, cache_path)
|
||||||
|
except IOError:
|
||||||
|
logger.debug("Encountered an issue when trying to restore OCSP dbm file")
|
||||||
|
|
||||||
|
def enable_ocsp_prefetch(self, lineage, domains):
|
||||||
|
"""Enable OCSP Stapling and prefetching of the responses.
|
||||||
|
|
||||||
|
In OCSP, each client (e.g. browser) would have to query the
|
||||||
|
OCSP Responder to validate that the site certificate was not revoked.
|
||||||
|
|
||||||
|
Enabling OCSP Stapling, would allow the web-server to query the OCSP
|
||||||
|
Responder, and staple its response to the offered certificate during
|
||||||
|
TLS. i.e. clients would not have to query the OCSP responder.
|
||||||
|
|
||||||
|
"""
|
||||||
|
# Fail early if we are not able to support this
|
||||||
|
self._ensure_ocsp_prefetch_compatibility()
|
||||||
|
prefetch_vhosts = set()
|
||||||
|
for domain in domains:
|
||||||
|
matched_vhosts = self.choose_vhosts(domain, create_if_no_ssl=False)
|
||||||
|
# We should be handling only SSL vhosts
|
||||||
|
for vh in matched_vhosts:
|
||||||
|
if vh.ssl:
|
||||||
|
prefetch_vhosts.add(vh)
|
||||||
|
|
||||||
|
if prefetch_vhosts:
|
||||||
|
for vh in prefetch_vhosts:
|
||||||
|
self._enable_ocsp_stapling(vh, None, prefetch=True)
|
||||||
|
self._ocsp_prefetch_save(lineage.cert_path, lineage.chain_path)
|
||||||
|
self.restart()
|
||||||
|
logger.warning(apache_util.certid_sha1_hex(lineage.cert_path))
|
||||||
|
self._ocsp_refresh(lineage.cert_path, lineage.chain_path)
|
||||||
|
|
||||||
|
def update_ocsp_prefetch(self, _unused_lineage):
|
||||||
|
"""Checks all certificates that are managed by OCSP prefetch, and
|
||||||
|
refreshes OCSP responses for them if required."""
|
||||||
|
|
||||||
|
self._ocsp_prefetch_fetch_state()
|
||||||
|
if not self._ocsp_prefetch:
|
||||||
|
# No OCSP prefetching enabled for any certificate
|
||||||
|
return
|
||||||
|
|
||||||
|
for _, pf in self._ocsp_prefetch.items():
|
||||||
|
if self._ocsp_refresh_if_needed(pf):
|
||||||
|
# Save the status to pluginstorage
|
||||||
|
self._ocsp_prefetch_save(pf["cert_path"], pf["chain_path"])
|
||||||
|
|
||||||
|
|
||||||
AutoHSTSEnhancement.register(ApacheConfigurator) # pylint: disable=no-member
|
AutoHSTSEnhancement.register(ApacheConfigurator) # pylint: disable=no-member
|
||||||
|
OCSPPrefetchEnhancement.register(ApacheConfigurator) # pylint: disable=no-member
|
||||||
|
|||||||
@@ -61,3 +61,9 @@ AUTOHSTS_FREQ = 172800
|
|||||||
MANAGED_COMMENT = "DO NOT REMOVE - Managed by Certbot"
|
MANAGED_COMMENT = "DO NOT REMOVE - Managed by Certbot"
|
||||||
MANAGED_COMMENT_ID = MANAGED_COMMENT+", VirtualHost id: {0}"
|
MANAGED_COMMENT_ID = MANAGED_COMMENT+", VirtualHost id: {0}"
|
||||||
"""Managed by Certbot comments and the VirtualHost identification template"""
|
"""Managed by Certbot comments and the VirtualHost identification template"""
|
||||||
|
|
||||||
|
OCSP_APACHE_TTL = 432000
|
||||||
|
"""Apache TTL for OCSP response: 5 days"""
|
||||||
|
|
||||||
|
OCSP_INTERNAL_TTL = 86400
|
||||||
|
"""Internal TTL for OCSP response: 1 day"""
|
||||||
|
|||||||
167
certbot-apache/certbot_apache/tests/ocsp_prefetch_test.py
Normal file
167
certbot-apache/certbot_apache/tests/ocsp_prefetch_test.py
Normal file
@@ -0,0 +1,167 @@
|
|||||||
|
"""Test for certbot_apache.configurator OCSP Prefetching functionality"""
|
||||||
|
import os
|
||||||
|
import unittest
|
||||||
|
import mock
|
||||||
|
# six is used in mock.patch()
|
||||||
|
import six # pylint: disable=unused-import
|
||||||
|
|
||||||
|
from certbot_apache.tests import util
|
||||||
|
|
||||||
|
|
||||||
|
class OCSPPrefetchTest(util.ApacheTest):
|
||||||
|
"""Tests for OCSP Prefetch feature"""
|
||||||
|
# pylint: disable=protected-access
|
||||||
|
|
||||||
|
def setUp(self): # pylint: disable=arguments-differ
|
||||||
|
super(OCSPPrefetchTest, self).setUp()
|
||||||
|
|
||||||
|
self.config = util.get_apache_configurator(
|
||||||
|
self.config_path, self.vhost_path, self.config_dir, self.work_dir)
|
||||||
|
|
||||||
|
self.lineage = mock.MagicMock(cert_path="cert", chain_path="chain")
|
||||||
|
self.config.parser.modules.add("headers_module")
|
||||||
|
self.config.parser.modules.add("mod_headers.c")
|
||||||
|
self.config.parser.modules.add("ssl_module")
|
||||||
|
self.config.parser.modules.add("mod_ssl.c")
|
||||||
|
self.config.parser.modules.add("socache_dbm_module")
|
||||||
|
self.config.parser.modules.add("mod_socache_dbm.c")
|
||||||
|
|
||||||
|
self.vh_truth = util.get_vh_truth(
|
||||||
|
self.temp_dir, "debian_apache_2_4/multiple_vhosts")
|
||||||
|
|
||||||
|
def call_mocked(self, func, *args, **kwargs):
|
||||||
|
"""Helper method to call functins with mock stack"""
|
||||||
|
|
||||||
|
ver_path = "certbot_apache.configurator.ApacheConfigurator.get_version"
|
||||||
|
cry_path = "certbot.crypto_util.cert_sha1_fingerprint"
|
||||||
|
|
||||||
|
with mock.patch(ver_path) as mock_ver:
|
||||||
|
mock_ver.return_value = (2, 4, 10)
|
||||||
|
with mock.patch(cry_path) as mock_cry:
|
||||||
|
mock_cry.return_value = b'j\x056\x1f\xfa\x08B\xe8D\xa1Bn\xeb*A\xebWx\xdd\xfe'
|
||||||
|
return func(*args, **kwargs)
|
||||||
|
|
||||||
|
@mock.patch("certbot_apache.configurator.ApacheConfigurator.restart")
|
||||||
|
@mock.patch("certbot_apache.configurator.ApacheConfigurator.enable_mod")
|
||||||
|
def test_ocsp_prefetch_enable_mods(self, mock_enable, _restart):
|
||||||
|
self.config.parser.modules.discard("socache_dbm_module")
|
||||||
|
self.config.parser.modules.discard("mod_socache_dbm.c")
|
||||||
|
self.config.parser.modules.discard("headers_module")
|
||||||
|
self.config.parser.modules.discard("mod_header.c")
|
||||||
|
|
||||||
|
ref_path = "certbot_apache.configurator.ApacheConfigurator._ocsp_refresh"
|
||||||
|
with mock.patch(ref_path):
|
||||||
|
self.call_mocked(self.config.enable_ocsp_prefetch,
|
||||||
|
self.lineage,
|
||||||
|
["ocspvhost.com"])
|
||||||
|
self.assertTrue(mock_enable.called)
|
||||||
|
self.assertEquals(len(self.config._ocsp_prefetch), 1)
|
||||||
|
|
||||||
|
@mock.patch("certbot_apache.constants.OCSP_INTERNAL_TTL", 0)
|
||||||
|
@mock.patch("certbot_apache.configurator.ApacheConfigurator.restart")
|
||||||
|
def test_ocsp_prefetch_refresh(self, _mock_restart):
|
||||||
|
db_path = os.path.join(self.config_dir, "ocsp", "ocsp_cache")
|
||||||
|
def ocsp_req_mock(workfile):
|
||||||
|
"""Method to mock the OCSP request and write response to file"""
|
||||||
|
with open(workfile, 'w') as fh:
|
||||||
|
fh.write("MOCKRESPONSE")
|
||||||
|
return True
|
||||||
|
|
||||||
|
ocsp_path = "certbot.ocsp.OCSPResponseHandler.ocsp_request_to_file"
|
||||||
|
with mock.patch(ocsp_path, side_effect=ocsp_req_mock):
|
||||||
|
self.call_mocked(self.config.enable_ocsp_prefetch,
|
||||||
|
self.lineage,
|
||||||
|
["ocspvhost.com"])
|
||||||
|
|
||||||
|
odbm = self.config._ocsp_dbm_open(db_path)
|
||||||
|
self.assertEquals(len(odbm.keys()), 1)
|
||||||
|
# The actual response data is prepended by Apache timestamp
|
||||||
|
self.assertTrue(odbm[odbm.keys()[0]].endswith(b'MOCKRESPONSE'))
|
||||||
|
self.config._ocsp_dbm_close(odbm)
|
||||||
|
|
||||||
|
with mock.patch(ocsp_path, side_effect=ocsp_req_mock) as mock_ocsp:
|
||||||
|
self.call_mocked(self.config.update_ocsp_prefetch, None)
|
||||||
|
self.assertTrue(mock_ocsp.called)
|
||||||
|
|
||||||
|
@mock.patch("certbot_apache.configurator.ApacheConfigurator.restart")
|
||||||
|
def test_ocsp_prefetch_refresh_noop(self, _mock_restart):
|
||||||
|
def ocsp_req_mock(workfile):
|
||||||
|
"""Method to mock the OCSP request and write response to file"""
|
||||||
|
with open(workfile, 'w') as fh:
|
||||||
|
fh.write("MOCKRESPONSE")
|
||||||
|
return True
|
||||||
|
|
||||||
|
ocsp_path = "certbot.ocsp.OCSPResponseHandler.ocsp_request_to_file"
|
||||||
|
with mock.patch(ocsp_path, side_effect=ocsp_req_mock):
|
||||||
|
self.call_mocked(self.config.enable_ocsp_prefetch,
|
||||||
|
self.lineage,
|
||||||
|
["ocspvhost.com"])
|
||||||
|
self.assertEquals(len(self.config._ocsp_prefetch), 1)
|
||||||
|
refresh_path = "certbot_apache.configurator.ApacheConfigurator._ocsp_refresh"
|
||||||
|
with mock.patch(refresh_path) as mock_refresh:
|
||||||
|
self.call_mocked(self.config.update_ocsp_prefetch, None)
|
||||||
|
self.assertFalse(mock_refresh.called)
|
||||||
|
|
||||||
|
@mock.patch("certbot_apache.configurator.ApacheConfigurator.config_test")
|
||||||
|
def test_ocsp_prefetch_backup_db(self, _mock_test):
|
||||||
|
db_path = os.path.join(self.config_dir, "ocsp", "ocsp_cache")
|
||||||
|
def ocsp_del_db():
|
||||||
|
"""Side effect of _reload() that deletes the DBM file, like Apache
|
||||||
|
does when restarting"""
|
||||||
|
full_db_path = db_path + ".db"
|
||||||
|
os.remove(full_db_path)
|
||||||
|
self.assertFalse(os.path.isfile(full_db_path))
|
||||||
|
|
||||||
|
self.config._ensure_ocsp_dirs()
|
||||||
|
odbm = self.config._ocsp_dbm_open(db_path)
|
||||||
|
odbm["mock_key"] = b'mock_value'
|
||||||
|
self.config._ocsp_dbm_close(odbm)
|
||||||
|
|
||||||
|
# Mock OCSP prefetch dict to signify that there should be a db
|
||||||
|
self.config._ocsp_prefetch = {"mock": "value"}
|
||||||
|
rel_path = "certbot_apache.configurator.ApacheConfigurator._reload"
|
||||||
|
with mock.patch(rel_path, side_effect=ocsp_del_db) as mock_reload:
|
||||||
|
self.config.restart()
|
||||||
|
|
||||||
|
odbm = self.config._ocsp_dbm_open(db_path)
|
||||||
|
self.assertEquals(odbm["mock_key"], b'mock_value')
|
||||||
|
self.config._ocsp_dbm_close(odbm)
|
||||||
|
|
||||||
|
@mock.patch("certbot_apache.configurator.ApacheConfigurator.config_test")
|
||||||
|
@mock.patch("certbot_apache.configurator.ApacheConfigurator._reload")
|
||||||
|
def test_ocsp_prefetch_backup_db_error(self, _mock_reload, _mock_test):
|
||||||
|
self.config._ensure_ocsp_dirs()
|
||||||
|
log_path = "certbot_apache.configurator.logger.debug"
|
||||||
|
log_string = "Encountered an issue while trying to backup OCSP dbm file"
|
||||||
|
log_string2 = "Encountered an issue when trying to restore OCSP dbm file"
|
||||||
|
self.config._ocsp_prefetch = {"mock": "value"}
|
||||||
|
with mock.patch("shutil.copy2", side_effect=IOError):
|
||||||
|
with mock.patch(log_path) as mock_log:
|
||||||
|
self.config.restart()
|
||||||
|
self.assertTrue(mock_log.called)
|
||||||
|
self.assertEquals(mock_log.call_count, 2)
|
||||||
|
self.assertTrue(log_string in mock_log.call_args_list[0][0][0])
|
||||||
|
self.assertTrue(log_string2 in mock_log.call_args_list[1][0][0])
|
||||||
|
|
||||||
|
@mock.patch("certbot_apache.configurator.ApacheConfigurator.restart")
|
||||||
|
def test_ocsp_prefetch_refresh_fail(self, _mock_restart):
|
||||||
|
ocsp_path = "certbot.ocsp.OCSPResponseHandler.ocsp_request_to_file"
|
||||||
|
log_path = "certbot_apache.configurator.logger.warning"
|
||||||
|
with mock.patch(ocsp_path) as mock_ocsp:
|
||||||
|
mock_ocsp.return_value = False
|
||||||
|
with mock.patch(log_path) as mock_log:
|
||||||
|
self.call_mocked(self.config.enable_ocsp_prefetch,
|
||||||
|
self.lineage,
|
||||||
|
["ocspvhost.com"])
|
||||||
|
self.assertTrue(mock_log.called)
|
||||||
|
self.assertTrue(
|
||||||
|
"trying to prefetch OCSP" in mock_log.call_args[0][0])
|
||||||
|
|
||||||
|
@mock.patch("certbot_apache.configurator.ApacheConfigurator._ocsp_refresh_if_needed")
|
||||||
|
def test_ocsp_prefetch_update_noop(self, mock_refresh):
|
||||||
|
self.config.update_ocsp_prefetch(None)
|
||||||
|
self.assertFalse(mock_refresh.called)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main() # pragma: no cover
|
||||||
@@ -60,6 +60,7 @@ CLI_DEFAULTS = dict(
|
|||||||
redirect=None,
|
redirect=None,
|
||||||
auto_hsts=False,
|
auto_hsts=False,
|
||||||
hsts=None,
|
hsts=None,
|
||||||
|
ocsp_prefetch=False,
|
||||||
uir=None,
|
uir=None,
|
||||||
staple=None,
|
staple=None,
|
||||||
strict_permissions=False,
|
strict_permissions=False,
|
||||||
|
|||||||
@@ -14,6 +14,7 @@ import six
|
|||||||
import zope.component
|
import zope.component
|
||||||
from cryptography.exceptions import InvalidSignature
|
from cryptography.exceptions import InvalidSignature
|
||||||
from cryptography.hazmat.backends import default_backend
|
from cryptography.hazmat.backends import default_backend
|
||||||
|
from cryptography.hazmat.primitives import hashes # type: ignore
|
||||||
from cryptography.hazmat.primitives.asymmetric.ec import ECDSA
|
from cryptography.hazmat.primitives.asymmetric.ec import ECDSA
|
||||||
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePublicKey
|
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePublicKey
|
||||||
from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15
|
from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15
|
||||||
@@ -224,6 +225,18 @@ def verify_renewable_cert(renewable_cert):
|
|||||||
verify_cert_matches_priv_key(renewable_cert.cert, renewable_cert.privkey)
|
verify_cert_matches_priv_key(renewable_cert.cert, renewable_cert.privkey)
|
||||||
|
|
||||||
|
|
||||||
|
def load_cert(cert_path):
|
||||||
|
"""Reads the certificate PEM file and returns a cryptography.x509 object
|
||||||
|
|
||||||
|
:param str cert_path: Path to the certificate
|
||||||
|
:rtype `cryptography.x509`:
|
||||||
|
:returns: x509 certificate object
|
||||||
|
"""
|
||||||
|
with open(cert_path, 'rb') as fh:
|
||||||
|
cert_pem = fh.read()
|
||||||
|
return x509.load_pem_x509_certificate(cert_pem, default_backend())
|
||||||
|
|
||||||
|
|
||||||
def verify_renewable_cert_sig(renewable_cert):
|
def verify_renewable_cert_sig(renewable_cert):
|
||||||
""" Verifies the signature of a `.storage.RenewableCert` object.
|
""" Verifies the signature of a `.storage.RenewableCert` object.
|
||||||
|
|
||||||
@@ -234,8 +247,7 @@ def verify_renewable_cert_sig(renewable_cert):
|
|||||||
try:
|
try:
|
||||||
with open(renewable_cert.chain, 'rb') as chain_file: # type: IO[bytes]
|
with open(renewable_cert.chain, 'rb') as chain_file: # type: IO[bytes]
|
||||||
chain = x509.load_pem_x509_certificate(chain_file.read(), default_backend())
|
chain = x509.load_pem_x509_certificate(chain_file.read(), default_backend())
|
||||||
with open(renewable_cert.cert, 'rb') as cert_file: # type: IO[bytes]
|
cert = load_cert(renewable_cert.cert)
|
||||||
cert = x509.load_pem_x509_certificate(cert_file.read(), default_backend())
|
|
||||||
pk = chain.public_key()
|
pk = chain.public_key()
|
||||||
with warnings.catch_warnings():
|
with warnings.catch_warnings():
|
||||||
warnings.simplefilter("ignore")
|
warnings.simplefilter("ignore")
|
||||||
@@ -458,6 +470,13 @@ def sha256sum(filename):
|
|||||||
sha256.update(f.read())
|
sha256.update(f.read())
|
||||||
return sha256.hexdigest()
|
return sha256.hexdigest()
|
||||||
|
|
||||||
|
|
||||||
|
def cert_sha1_fingerprint(cert_path):
|
||||||
|
"""Get sha1 digest of the certificate fingerprint"""
|
||||||
|
cert = load_cert(cert_path)
|
||||||
|
return cert.fingerprint(hashes.SHA1())
|
||||||
|
|
||||||
|
|
||||||
def cert_and_chain_from_fullchain(fullchain_pem):
|
def cert_and_chain_from_fullchain(fullchain_pem):
|
||||||
"""Split fullchain_pem into cert_pem and chain_pem
|
"""Split fullchain_pem into cert_pem and chain_pem
|
||||||
|
|
||||||
|
|||||||
@@ -108,3 +108,12 @@ class ConfigurationError(Error):
|
|||||||
|
|
||||||
class MissingCommandlineFlag(Error):
|
class MissingCommandlineFlag(Error):
|
||||||
"""A command line argument was missing in noninteractive usage"""
|
"""A command line argument was missing in noninteractive usage"""
|
||||||
|
|
||||||
|
# OCSP errors:
|
||||||
|
|
||||||
|
class OCSPRevokedError(Error):
|
||||||
|
"""Certificate is revoked based on the OCSP response"""
|
||||||
|
|
||||||
|
|
||||||
|
class OCSPRequestError(Error):
|
||||||
|
"""Error in OCSP request"""
|
||||||
|
|||||||
@@ -941,6 +941,8 @@ def enhance(config, plugins):
|
|||||||
lineage = cert_manager.lineage_for_certname(config, config.certname)
|
lineage = cert_manager.lineage_for_certname(config, config.certname)
|
||||||
if not config.chain_path:
|
if not config.chain_path:
|
||||||
config.chain_path = lineage.chain_path
|
config.chain_path = lineage.chain_path
|
||||||
|
if not config.cert_path:
|
||||||
|
config.cert_path = lineage.cert_path
|
||||||
if oldstyle_enh:
|
if oldstyle_enh:
|
||||||
le_client = _init_le_client(config, authenticator=None, installer=installer)
|
le_client = _init_le_client(config, authenticator=None, installer=installer)
|
||||||
le_client.enhance_config(domains, config.chain_path, ask_redirect=False)
|
le_client.enhance_config(domains, config.chain_path, ask_redirect=False)
|
||||||
|
|||||||
122
certbot/ocsp.py
122
certbot/ocsp.py
@@ -1,15 +1,109 @@
|
|||||||
"""Tools for checking certificate revocation."""
|
"""Tools for checking certificate revocation."""
|
||||||
import logging
|
import logging
|
||||||
|
import os
|
||||||
import re
|
import re
|
||||||
|
|
||||||
from subprocess import Popen, PIPE
|
from subprocess import Popen, PIPE
|
||||||
|
|
||||||
|
from certbot import crypto_util
|
||||||
from certbot import errors
|
from certbot import errors
|
||||||
from certbot import util
|
from certbot import util
|
||||||
|
|
||||||
|
from cryptography.x509.oid import ExtensionOID, AuthorityInformationAccessOID
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
class RevocationChecker(object):
|
class OCSPBase(object):
|
||||||
|
"""Base class for OCSP request operations"""
|
||||||
|
|
||||||
|
def determine_ocsp_server(self, cert_path):
|
||||||
|
"""Extract the OCSP server host from a certificate.
|
||||||
|
|
||||||
|
:param str cert_path: Path to the cert we're checking OCSP for
|
||||||
|
:rtype tuple:
|
||||||
|
:returns: (OCSP server URL or None, OCSP server host or None)
|
||||||
|
|
||||||
|
"""
|
||||||
|
url = self._ocsp_host_from_cert(cert_path)
|
||||||
|
if url:
|
||||||
|
url = url.strip()
|
||||||
|
host = url.partition("://")[2].rstrip("/")
|
||||||
|
if host:
|
||||||
|
return url, host
|
||||||
|
else:
|
||||||
|
logger.info("Cannot process OCSP host from URL (%s) in cert at %s", url, cert_path)
|
||||||
|
return None, None
|
||||||
|
|
||||||
|
def _ocsp_host_from_cert(self, cert_path):
|
||||||
|
"""Helper method for determine_ocsp_server to read the actual OCSP
|
||||||
|
server information from a certificate file"""
|
||||||
|
cert = crypto_util.load_cert(cert_path)
|
||||||
|
ocsp_authinfo = cert.extensions.get_extension_for_oid(
|
||||||
|
ExtensionOID.AUTHORITY_INFORMATION_ACCESS)
|
||||||
|
for obj in ocsp_authinfo.value:
|
||||||
|
if obj.access_method == AuthorityInformationAccessOID.OCSP:
|
||||||
|
return obj.access_location.value
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _request_status(self, response):
|
||||||
|
"""Checks that the OCSP response was successful from the text output"""
|
||||||
|
pattern = re.compile(r"OCSP Response Status: (.*)\n")
|
||||||
|
return "successful (0x0)" in pattern.findall(response)
|
||||||
|
|
||||||
|
def _response_successful(self, response, cert_path):
|
||||||
|
"""Checks that the certificate is not revoked"""
|
||||||
|
pattern = re.compile(r"{0}: (.*)\n".format(cert_path))
|
||||||
|
if "revoked" in pattern.findall(response):
|
||||||
|
raise errors.OCSPRevokedError("Certificate is revoked")
|
||||||
|
return "good" in pattern.findall(response)
|
||||||
|
|
||||||
|
class OCSPResponseHandler(OCSPBase):
|
||||||
|
"""Class for handling OCSP requests"""
|
||||||
|
def __init__(self, cert_path, chain_path):
|
||||||
|
self.cert_path = cert_path
|
||||||
|
self.chain_path = chain_path
|
||||||
|
|
||||||
|
def ocsp_request_to_file(self, filepath):
|
||||||
|
"""Make OCSP request and save the response to a file.
|
||||||
|
|
||||||
|
:param str filepath: Path to save the OCSP response to.
|
||||||
|
|
||||||
|
:returns: True if the response was successfully saved
|
||||||
|
:rtype: bool
|
||||||
|
|
||||||
|
:raises errors.OCSPRevokedError: If certificate is revoked.
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
url, _ = self.determine_ocsp_server(self.cert_path)
|
||||||
|
cmd = ["openssl", "ocsp",
|
||||||
|
"-no_nonce",
|
||||||
|
"-issuer", self.chain_path,
|
||||||
|
"-cert", self.cert_path,
|
||||||
|
"-url", url,
|
||||||
|
"-CAfile", self.chain_path,
|
||||||
|
"-verify_other", self.chain_path,
|
||||||
|
"-trust_other",
|
||||||
|
"-respout", filepath,
|
||||||
|
"-text"]
|
||||||
|
try:
|
||||||
|
output, _ = util.run_script(cmd, log=logger.debug)
|
||||||
|
except errors.SubprocessError:
|
||||||
|
logger.info("OCSP check failed for %s (are we offline?)", self.cert_path)
|
||||||
|
return False
|
||||||
|
|
||||||
|
if not self._request_status(output):
|
||||||
|
raise errors.OCSPRequestError("OCSP request returned an error")
|
||||||
|
try:
|
||||||
|
if self._response_successful(output, self.cert_path):
|
||||||
|
return os.path.isfile(filepath)
|
||||||
|
except errors.OCSPRevokedError:
|
||||||
|
logger.warning("Certificate %s is revoked.", self.cert_path)
|
||||||
|
raise
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
class RevocationChecker(OCSPBase):
|
||||||
"This class figures out OCSP checking on this system, and performs it."
|
"This class figures out OCSP checking on this system, and performs it."
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
@@ -20,7 +114,7 @@ class RevocationChecker(object):
|
|||||||
self.broken = True
|
self.broken = True
|
||||||
return
|
return
|
||||||
|
|
||||||
# New versions of openssl want -header var=val, old ones want -header var val
|
# New versions of openssl want -header var=val, old ones want -header var val
|
||||||
test_host_format = Popen(["openssl", "ocsp", "-header", "var", "val"],
|
test_host_format = Popen(["openssl", "ocsp", "-header", "var", "val"],
|
||||||
stdout=PIPE, stderr=PIPE, universal_newlines=True)
|
stdout=PIPE, stderr=PIPE, universal_newlines=True)
|
||||||
_out, err = test_host_format.communicate()
|
_out, err = test_host_format.communicate()
|
||||||
@@ -69,30 +163,6 @@ class RevocationChecker(object):
|
|||||||
return _translate_ocsp_query(cert_path, output, err)
|
return _translate_ocsp_query(cert_path, output, err)
|
||||||
|
|
||||||
|
|
||||||
def determine_ocsp_server(self, cert_path):
|
|
||||||
"""Extract the OCSP server host from a certificate.
|
|
||||||
|
|
||||||
:param str cert_path: Path to the cert we're checking OCSP for
|
|
||||||
:rtype tuple:
|
|
||||||
:returns: (OCSP server URL or None, OCSP server host or None)
|
|
||||||
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
url, _err = util.run_script(
|
|
||||||
["openssl", "x509", "-in", cert_path, "-noout", "-ocsp_uri"],
|
|
||||||
log=logger.debug)
|
|
||||||
except errors.SubprocessError:
|
|
||||||
logger.info("Cannot extract OCSP URI from %s", cert_path)
|
|
||||||
return None, None
|
|
||||||
|
|
||||||
url = url.rstrip()
|
|
||||||
host = url.partition("://")[2].rstrip("/")
|
|
||||||
if host:
|
|
||||||
return url, host
|
|
||||||
else:
|
|
||||||
logger.info("Cannot process OCSP host from URL (%s) in cert at %s", url, cert_path)
|
|
||||||
return None, None
|
|
||||||
|
|
||||||
def _translate_ocsp_query(cert_path, ocsp_output, ocsp_errors):
|
def _translate_ocsp_query(cert_path, ocsp_output, ocsp_errors):
|
||||||
"""Parse openssl's weird output to work out what it means."""
|
"""Parse openssl's weird output to work out what it means."""
|
||||||
|
|
||||||
|
|||||||
@@ -143,6 +143,60 @@ class AutoHSTSEnhancement(object):
|
|||||||
:type domains: str
|
:type domains: str
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
@six.add_metaclass(abc.ABCMeta)
|
||||||
|
class OCSPPrefetchEnhancement(object):
|
||||||
|
"""
|
||||||
|
Enhancement interface that installer plugins can implement in order to
|
||||||
|
provide functionality that prefetches an OCSP response and stores it
|
||||||
|
to be served for incoming client requests.
|
||||||
|
|
||||||
|
The plugins implementing new style enhancements are responsible of handling
|
||||||
|
the saving of configuration checkpoints as well as calling possible restarts
|
||||||
|
of managed software themselves. For update_ocsp_prefetch method, the installer
|
||||||
|
may have to call prepare() to finalize the plugin initialization.
|
||||||
|
|
||||||
|
Methods:
|
||||||
|
enable_ocsp_prefetch is called when the domain is configured to
|
||||||
|
serve OCSP responses using mechanism called OCSP Stapling.
|
||||||
|
|
||||||
|
update_ocsp_prefetch is called every time when Certbot is run using 'renew'
|
||||||
|
verb. Certbot should proceed to make a request to the OCSP server in order
|
||||||
|
to fetch an OCSP response and to store the recieved response, if valid.
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def update_ocsp_prefetch(self, lineage, *args, **kwargs):
|
||||||
|
"""
|
||||||
|
Gets called for each lineage every time Certbot is run with 'renew' verb.
|
||||||
|
Implementation of this method should fetch a fresh OCSP response and if
|
||||||
|
valid, store it to be served for connecting clients.
|
||||||
|
|
||||||
|
:param lineage: Certificate lineage object
|
||||||
|
:type lineage: certbot.storage.RenewableCert
|
||||||
|
|
||||||
|
.. note:: prepare() method inherited from `interfaces.IPlugin` might need
|
||||||
|
to be called manually within implementation of this interface method
|
||||||
|
to finalize the plugin initialization.
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def enable_ocsp_prefetch(self, lineage, domains, *args, **kwargs):
|
||||||
|
"""
|
||||||
|
Enables the OCSP enhancement, enabling OCSP Stapling functionality for
|
||||||
|
the controlled software, and sets it up for prefetching the responses
|
||||||
|
over the subsequent runs of Certbot renew.
|
||||||
|
|
||||||
|
:param lineage: Certificate lineage object
|
||||||
|
:type lineage: certbot.storage.RenewableCert
|
||||||
|
|
||||||
|
:param domains: List of domains in certificate to enhance
|
||||||
|
:type domains: str
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
# This is used to configure internal new style enhancements in Certbot. These
|
# This is used to configure internal new style enhancements in Certbot. These
|
||||||
# enhancement interfaces need to be defined in this file. Please do not modify
|
# enhancement interfaces need to be defined in this file. Please do not modify
|
||||||
# this list from plugin code.
|
# this list from plugin code.
|
||||||
@@ -160,5 +214,19 @@ _INDEX = [
|
|||||||
"updater_function": "update_autohsts",
|
"updater_function": "update_autohsts",
|
||||||
"deployer_function": "deploy_autohsts",
|
"deployer_function": "deploy_autohsts",
|
||||||
"enable_function": "enable_autohsts"
|
"enable_function": "enable_autohsts"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "OCSPPrefetch",
|
||||||
|
"cli_help": "Prefetch OCSP responses for certificates in order to be "+
|
||||||
|
"able to serve connecting clients fresh staple immediately",
|
||||||
|
"cli_flag": "--ocsp-prefetch",
|
||||||
|
"cli_flag_default": constants.CLI_DEFAULTS["ocsp_prefetch"],
|
||||||
|
"cli_groups": ["security", "enhance"],
|
||||||
|
"cli_dest": "ocsp_prefetch",
|
||||||
|
"cli_action": "store_true",
|
||||||
|
"class": OCSPPrefetchEnhancement,
|
||||||
|
"updater_function": "update_ocsp_prefetch",
|
||||||
|
"deployer_function": None,
|
||||||
|
"enable_function": "enable_ocsp_prefetch"
|
||||||
}
|
}
|
||||||
] # type: List[Dict[str, Any]]
|
] # type: List[Dict[str, Any]]
|
||||||
|
|||||||
@@ -73,20 +73,16 @@ class OCSPTest(unittest.TestCase):
|
|||||||
|
|
||||||
|
|
||||||
@mock.patch('certbot.ocsp.logger.info')
|
@mock.patch('certbot.ocsp.logger.info')
|
||||||
@mock.patch('certbot.util.run_script')
|
@mock.patch('certbot.ocsp.RevocationChecker._ocsp_host_from_cert')
|
||||||
def test_determine_ocsp_server(self, mock_run, mock_info):
|
def test_determine_ocsp_server(self, mock_host, mock_info):
|
||||||
uri = "http://ocsp.stg-int-x1.letsencrypt.org/"
|
uri = "http://ocsp.stg-int-x1.letsencrypt.org/"
|
||||||
host = "ocsp.stg-int-x1.letsencrypt.org"
|
host = "ocsp.stg-int-x1.letsencrypt.org"
|
||||||
mock_run.return_value = uri, ""
|
mock_host.return_value = uri
|
||||||
self.assertEqual(self.checker.determine_ocsp_server("beep"), (uri, host))
|
self.assertEqual(self.checker.determine_ocsp_server("beep"), (uri, host))
|
||||||
mock_run.return_value = "ftp:/" + host + "/", ""
|
mock_host.return_value = "ftp:/" + host + "/"
|
||||||
self.assertEqual(self.checker.determine_ocsp_server("beep"), (None, None))
|
self.assertEqual(self.checker.determine_ocsp_server("beep"), (None, None))
|
||||||
self.assertEqual(mock_info.call_count, 1)
|
self.assertEqual(mock_info.call_count, 1)
|
||||||
|
|
||||||
c = "confusion"
|
|
||||||
mock_run.side_effect = errors.SubprocessError(c)
|
|
||||||
self.assertEqual(self.checker.determine_ocsp_server("beep"), (None, None))
|
|
||||||
|
|
||||||
@mock.patch('certbot.ocsp.logger')
|
@mock.patch('certbot.ocsp.logger')
|
||||||
@mock.patch('certbot.util.run_script')
|
@mock.patch('certbot.util.run_script')
|
||||||
def test_translate_ocsp(self, mock_run, mock_log):
|
def test_translate_ocsp(self, mock_run, mock_log):
|
||||||
@@ -111,7 +107,6 @@ class OCSPTest(unittest.TestCase):
|
|||||||
self.assertEqual(ocsp._translate_ocsp_query(*openssl_expired_ocsp_revoked), True)
|
self.assertEqual(ocsp._translate_ocsp_query(*openssl_expired_ocsp_revoked), True)
|
||||||
self.assertEqual(mock_log.info.call_count, 1)
|
self.assertEqual(mock_log.info.call_count, 1)
|
||||||
|
|
||||||
|
|
||||||
# pylint: disable=line-too-long
|
# pylint: disable=line-too-long
|
||||||
openssl_confused = ("", """
|
openssl_confused = ("", """
|
||||||
/etc/letsencrypt/live/example.org/cert.pem: good
|
/etc/letsencrypt/live/example.org/cert.pem: good
|
||||||
|
|||||||
Reference in New Issue
Block a user