Compare commits
16 Commits
master
...
ocsp_apach
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4c5c0bfceb | ||
|
|
0bc3eb208f | ||
|
|
edaacab2f7 | ||
|
|
f40a067c27 | ||
|
|
85ce3cabb4 | ||
|
|
373f8ac85a | ||
|
|
c296fd894c | ||
|
|
35c0d79390 | ||
|
|
ba9de53768 | ||
|
|
e4834da2c1 | ||
|
|
34899c1c14 | ||
|
|
ff5a8a83b4 | ||
|
|
9366633ae6 | ||
|
|
cec96a523c | ||
|
|
c20c722acf | ||
|
|
118bf1d930 |
@@ -7,6 +7,8 @@ Certbot adheres to [Semantic Versioning](http://semver.org/).
|
||||
### Added
|
||||
|
||||
* `revoke` accepts `--cert-name`, and doesn't accept both `--cert-name` and `--cert-path`.
|
||||
* Added OCSP prefetching functionality for Apache plugin that attempts to refresh the OCSP
|
||||
response cache for managed certificates when scheduled Certbot renew is being run.
|
||||
|
||||
### Changed
|
||||
|
||||
|
||||
@@ -1,7 +1,11 @@
|
||||
""" Utility functions for certbot-apache plugin """
|
||||
import binascii
|
||||
import os
|
||||
import six
|
||||
import struct
|
||||
import time
|
||||
|
||||
from certbot import crypto_util
|
||||
from certbot import util
|
||||
|
||||
def get_mod_deps(mod_name):
|
||||
@@ -104,3 +108,47 @@ def parse_define_file(filepath, varname):
|
||||
def unique_id():
|
||||
""" Returns an unique id to be used as a VirtualHost identifier"""
|
||||
return binascii.hexlify(os.urandom(16)).decode("utf-8")
|
||||
|
||||
|
||||
def get_apache_ocsp_struct(ttl, ocsp_response):
|
||||
"""Create Apache OCSP response structure to be used in response cache
|
||||
|
||||
:param int ttl: Time-To-Live in seocnds
|
||||
:param str ocsp_response: OCSP response data
|
||||
|
||||
:returns: Apache OCSP structure
|
||||
:rtype: `str`
|
||||
|
||||
"""
|
||||
|
||||
ttl = time.time() + ttl
|
||||
# As microseconds
|
||||
ttl_struct = struct.pack('l', int(ttl*1000000))
|
||||
return b'\x01'.join([ttl_struct, ocsp_response])
|
||||
|
||||
|
||||
def certid_sha1_hex(cert_path):
|
||||
"""Hex representation of certificate SHA1 fingerprint
|
||||
|
||||
:param str cert_path: File path to certificate
|
||||
|
||||
:returns: Hex representation SHA1 fingerprint of certificate
|
||||
:rtype: `str`
|
||||
|
||||
"""
|
||||
sha1_hex = binascii.hexlify(certid_sha1(cert_path))
|
||||
if isinstance(sha1_hex, six.binary_type):
|
||||
return sha1_hex.decode('utf-8') # pragma: no cover
|
||||
return sha1_hex # pragma: no cover
|
||||
|
||||
|
||||
def certid_sha1(cert_path):
|
||||
"""SHA1 fingerprint of certificate
|
||||
|
||||
:param str cert_path: File path to certificate
|
||||
|
||||
:returns: SHA1 fingerprint bytestring
|
||||
:rtype: `str`
|
||||
|
||||
"""
|
||||
return crypto_util.cert_sha1_fingerprint(cert_path)
|
||||
|
||||
@@ -6,6 +6,7 @@ import logging
|
||||
import os
|
||||
import pkg_resources
|
||||
import re
|
||||
import shutil
|
||||
import six
|
||||
import socket
|
||||
import time
|
||||
@@ -18,6 +19,7 @@ from acme.magic_typing import Any, DefaultDict, Dict, List, Set, Union # pylint
|
||||
|
||||
from certbot import errors
|
||||
from certbot import interfaces
|
||||
from certbot import ocsp
|
||||
from certbot import util
|
||||
|
||||
from certbot.achallenges import KeyAuthorizationAnnotatedChallenge # pylint: disable=unused-import
|
||||
@@ -36,6 +38,7 @@ from certbot_apache import tls_sni_01
|
||||
|
||||
from collections import defaultdict
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -188,6 +191,8 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
|
||||
self._enhanced_vhosts = defaultdict(set) # type: DefaultDict[str, Set[obj.VirtualHost]]
|
||||
# Temporary state for AutoHSTS enhancement
|
||||
self._autohsts = {} # type: Dict[str, Dict[str, Union[int, float]]]
|
||||
self._ocsp_prefetch = {} # type: Dict[str, str]
|
||||
self._ocsp_dbm_bsddb = False
|
||||
|
||||
# These will be set in the prepare function
|
||||
self._prepared = False
|
||||
@@ -1692,7 +1697,7 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
|
||||
self.parser.find_dir("SSLCertificateKeyFile",
|
||||
lineage.key_path, vhost.path))
|
||||
|
||||
def _enable_ocsp_stapling(self, ssl_vhost, unused_options):
|
||||
def _enable_ocsp_stapling(self, ssl_vhost, unused_options, prefetch=False):
|
||||
"""Enables OCSP Stapling
|
||||
|
||||
In OCSP, each client (e.g. browser) would have to query the
|
||||
@@ -1713,8 +1718,8 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
|
||||
:param unused_options: Not currently used
|
||||
:type unused_options: Not Available
|
||||
|
||||
:returns: Success, general_vhost (HTTP vhost)
|
||||
:rtype: (bool, :class:`~certbot_apache.obj.VirtualHost`)
|
||||
:param prefetch: Use OCSP prefetching
|
||||
:type prefetch: bool
|
||||
|
||||
"""
|
||||
min_apache_ver = (2, 3, 3)
|
||||
@@ -1723,8 +1728,15 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
|
||||
"Unable to set OCSP directives.\n"
|
||||
"Apache version is below 2.3.3.")
|
||||
|
||||
if "socache_shmcb_module" not in self.parser.modules:
|
||||
self.enable_mod("socache_shmcb")
|
||||
if prefetch:
|
||||
if "socache_dbm_module" not in self.parser.modules:
|
||||
self.enable_mod("socache_dbm")
|
||||
cache_path = os.path.join(self.config.config_dir, "ocsp", "ocsp_cache.db")
|
||||
cache_dir = ["dbm:"+cache_path]
|
||||
else:
|
||||
if "socache_shmcb_module" not in self.parser.modules:
|
||||
self.enable_mod("socache_shmcb")
|
||||
cache_dir = ["shmcb:/var/run/apache2/stapling_cache(128000)"]
|
||||
|
||||
# Check if there's an existing SSLUseStapling directive on.
|
||||
use_stapling_aug_path = self.parser.find_dir("SSLUseStapling",
|
||||
@@ -1745,8 +1757,7 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
|
||||
re.sub(r"/\w*$", "", stapling_cache_aug_path[0]))
|
||||
|
||||
self.parser.add_dir_to_ifmodssl(ssl_vhost_aug_path,
|
||||
"SSLStaplingCache",
|
||||
["shmcb:/var/run/apache2/stapling_cache(128000)"])
|
||||
"SSLStaplingCache", cache_dir)
|
||||
|
||||
msg = "OCSP Stapling was enabled on SSL Vhost: %s.\n"%(
|
||||
ssl_vhost.filep)
|
||||
@@ -2171,7 +2182,17 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
|
||||
|
||||
"""
|
||||
self.config_test()
|
||||
|
||||
if not self._ocsp_prefetch:
|
||||
# Try to populate OCSP prefetch structure from pluginstorage
|
||||
self._ocsp_prefetch_fetch_state()
|
||||
if self._ocsp_prefetch:
|
||||
# OCSP prefetching is enabled, so back up the db
|
||||
self._ocsp_prefetch_backup_db()
|
||||
self._reload()
|
||||
if self._ocsp_prefetch:
|
||||
# Restore the backed up dbm database
|
||||
self._ocsp_prefetch_restore_db()
|
||||
|
||||
def _reload(self):
|
||||
"""Reloads the Apache server.
|
||||
@@ -2494,5 +2515,222 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
|
||||
# Update AutoHSTS storage (We potentially removed vhosts from managed)
|
||||
self._autohsts_save_state()
|
||||
|
||||
def _ensure_ocsp_dirs(self):
|
||||
"""Makes sure that the OCSP directory paths exist."""
|
||||
ocsp_work = os.path.join(self.config.work_dir, "ocsp")
|
||||
ocsp_save = os.path.join(self.config.config_dir, "ocsp")
|
||||
for path in [ocsp_work, ocsp_save]:
|
||||
if not os.path.isdir(path):
|
||||
os.makedirs(path)
|
||||
os.chmod(path, 0o755)
|
||||
|
||||
def _ensure_ocsp_prefetch_compatibility(self):
|
||||
"""Make sure that the operating system supports the required libraries
|
||||
to manage Apache DBM files.
|
||||
|
||||
:raises: errors.NotSupportedError
|
||||
"""
|
||||
try:
|
||||
import bsddb # pylint: disable=unused-variable
|
||||
except ImportError:
|
||||
import dbm
|
||||
if not hasattr(dbm, 'ndbm') or dbm.ndbm.library != 'Berkeley DB': # pylint: disable=no-member
|
||||
msg = ("Unfortunately your operating system does not have a "
|
||||
"compatible database module available for managing "
|
||||
"Apache OCSP stapling cache database.")
|
||||
raise errors.NotSupportedError(msg)
|
||||
|
||||
def _ocsp_dbm_open(self, filepath):
|
||||
"""Helper method to open an DBM file in a way that depends on the platform
|
||||
that Certbot is run on. Returns an open database structure."""
|
||||
if not os.path.isfile(filepath+".db"):
|
||||
raise errors.PluginError(
|
||||
"The OCSP stapling cache DBM file wasn't created by Apache.")
|
||||
try:
|
||||
import bsddb
|
||||
self._ocsp_dbm_bsddb = True
|
||||
cache_path = filepath + ".db"
|
||||
try:
|
||||
database = bsddb.hashopen(cache_path, 'w')
|
||||
except Exception:
|
||||
raise errors.PluginError("Unable to open dbm database file.")
|
||||
except ImportError:
|
||||
# Python3 doesn't have bsddb module, so we use dbm.ndbm instead
|
||||
import dbm
|
||||
try:
|
||||
database = dbm.ndbm.open(filepath, 'w') # pylint: disable=no-member
|
||||
except Exception:
|
||||
# This is raised if a file cannot be found
|
||||
raise errors.PluginError("Unable to open dbm database file.")
|
||||
return database
|
||||
|
||||
def _ocsp_dbm_close(self, database):
|
||||
"""Helper method to sync and close a DBM file, in a way required by the
|
||||
used dbm implementation."""
|
||||
if self._ocsp_dbm_bsddb:
|
||||
database.sync()
|
||||
database.close()
|
||||
else:
|
||||
database.close()
|
||||
|
||||
def _ocsp_refresh_if_needed(self, pf_obj):
|
||||
"""Refreshes OCSP response for a certiifcate if it's due
|
||||
|
||||
:param dict pf_obj: OCSP prefetch object from pluginstorage
|
||||
|
||||
:returns: If OCSP response was updated
|
||||
:rtype: bool
|
||||
|
||||
"""
|
||||
ttl = pf_obj["lastupdate"] + constants.OCSP_INTERNAL_TTL
|
||||
if ttl < time.time():
|
||||
self._ocsp_refresh(pf_obj["cert_path"], pf_obj["chain_path"])
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def _ocsp_refresh(self, cert_path, chain_path):
|
||||
"""Refresh the OCSP response for a certificate
|
||||
|
||||
:param str cert_path: Filesystem path to certificate file
|
||||
:param str chain_path: Filesystem path to certificate chain file
|
||||
|
||||
"""
|
||||
|
||||
self._ensure_ocsp_dirs()
|
||||
handler = ocsp.OCSPResponseHandler(cert_path, chain_path)
|
||||
ocsp_workfile = os.path.join(
|
||||
self.config.work_dir, "ocsp",
|
||||
apache_util.certid_sha1_hex(cert_path))
|
||||
if handler.ocsp_request_to_file(ocsp_workfile):
|
||||
# Guaranteed good response
|
||||
cache_path = os.path.join(self.config.config_dir, "ocsp", "ocsp_cache")
|
||||
# dbm.open automatically adds the file extension, it will be
|
||||
db = self._ocsp_dbm_open(cache_path)
|
||||
#db = dbm.open(cache_path, "c")
|
||||
cert_sha = apache_util.certid_sha1(cert_path)
|
||||
db[cert_sha] = self._ocsp_response_dbm(ocsp_workfile)
|
||||
self._ocsp_dbm_close(db)
|
||||
#db.close()
|
||||
else:
|
||||
logger.warning("Encountered an issue while trying to prefetch OCSP "
|
||||
"response for certificate: %s", cert_path)
|
||||
|
||||
def _ocsp_response_dbm(self, workfile):
|
||||
"""Creates a dbm entry for OCSP response data
|
||||
|
||||
:param str workfile: File path for raw OCSP response
|
||||
|
||||
:returns: OCSP response cache data that Apache can use
|
||||
:rtype: string
|
||||
"""
|
||||
|
||||
with open(workfile, 'rb') as fh:
|
||||
response = fh.read()
|
||||
ttl = constants.OCSP_APACHE_TTL
|
||||
return apache_util.get_apache_ocsp_struct(ttl, response)
|
||||
|
||||
def _ocsp_prefetch_save(self, cert_path, chain_path):
|
||||
"""Saves status of current OCSP prefetch, including the last update
|
||||
time to determine if an update is needed on later run.
|
||||
|
||||
:param str cert_path: Filesystem path to certificate
|
||||
:param str chain_path: Filesystem path to certificate chain file
|
||||
|
||||
"""
|
||||
status = {
|
||||
"lastupdate": time.time(),
|
||||
"cert_path": cert_path,
|
||||
"chain_path": chain_path
|
||||
}
|
||||
cert_id = apache_util.certid_sha1_hex(cert_path)
|
||||
self._ocsp_prefetch[cert_id] = status
|
||||
self.storage.put("ocsp_prefetch", self._ocsp_prefetch)
|
||||
self.storage.save()
|
||||
|
||||
def _ocsp_prefetch_fetch_state(self):
|
||||
"""
|
||||
Populates the OCSP prefetch state from the pluginstorage.
|
||||
"""
|
||||
try:
|
||||
self._ocsp_prefetch = self.storage.fetch("ocsp_prefetch")
|
||||
except KeyError:
|
||||
self._ocsp_prefetch = dict()
|
||||
|
||||
def _ocsp_prefetch_backup_db(self):
|
||||
"""
|
||||
Copies the active dbm file to work directory.
|
||||
"""
|
||||
self._ensure_ocsp_dirs()
|
||||
cache_path = os.path.join(self.config.config_dir, "ocsp", "ocsp_cache.db")
|
||||
try:
|
||||
shutil.copy2(cache_path, os.path.join(self.config.work_dir, "ocsp"))
|
||||
except IOError:
|
||||
logger.debug("Encountered an issue while trying to backup OCSP dbm file")
|
||||
|
||||
def _ocsp_prefetch_restore_db(self):
|
||||
"""
|
||||
Restores the active dbm file from work directory.
|
||||
"""
|
||||
self._ensure_ocsp_dirs()
|
||||
cache_path = os.path.join(self.config.config_dir, "ocsp", "ocsp_cache.db")
|
||||
work_file_path = os.path.join(self.config.work_dir, "ocsp", "ocsp_cache.db")
|
||||
try:
|
||||
shutil.copy2(work_file_path, cache_path)
|
||||
except IOError:
|
||||
logger.debug("Encountered an issue when trying to restore OCSP dbm file")
|
||||
|
||||
def enable_ocsp_prefetch(self, lineage, domains):
|
||||
"""Enable OCSP Stapling and prefetching of the responses.
|
||||
|
||||
In OCSP, each client (e.g. browser) would have to query the
|
||||
OCSP Responder to validate that the site certificate was not revoked.
|
||||
|
||||
Enabling OCSP Stapling, would allow the web-server to query the OCSP
|
||||
Responder, and staple its response to the offered certificate during
|
||||
TLS. i.e. clients would not have to query the OCSP responder.
|
||||
|
||||
"""
|
||||
|
||||
# Fail early if we are not able to support this
|
||||
self._ensure_ocsp_prefetch_compatibility()
|
||||
prefetch_vhosts = set()
|
||||
for domain in domains:
|
||||
matched_vhosts = self.choose_vhosts(domain, create_if_no_ssl=False)
|
||||
# We should be handling only SSL vhosts
|
||||
for vh in matched_vhosts:
|
||||
if vh.ssl:
|
||||
prefetch_vhosts.add(vh)
|
||||
|
||||
if prefetch_vhosts:
|
||||
for vh in prefetch_vhosts:
|
||||
self._enable_ocsp_stapling(vh, None, prefetch=True)
|
||||
self.restart()
|
||||
try:
|
||||
self._ocsp_refresh(lineage.cert_path, lineage.chain_path)
|
||||
self._ocsp_prefetch_save(lineage.cert_path, lineage.chain_path)
|
||||
self.save("Enabled OCSP prefetching")
|
||||
except errors.PluginError as err:
|
||||
# Revert the OCSP prefetch configuration
|
||||
self.recovery_routine()
|
||||
self.restart()
|
||||
msg = ("Encountered an error while trying to enable OCSP prefetch "
|
||||
"enhancement: %s.\nOCSP prefetch was not enabled.")
|
||||
raise errors.PluginError(msg % str(err))
|
||||
|
||||
def update_ocsp_prefetch(self, _unused_lineage):
|
||||
"""Checks all certificates that are managed by OCSP prefetch, and
|
||||
refreshes OCSP responses for them if required."""
|
||||
|
||||
self._ocsp_prefetch_fetch_state()
|
||||
if not self._ocsp_prefetch:
|
||||
# No OCSP prefetching enabled for any certificate
|
||||
return
|
||||
|
||||
for _, pf in self._ocsp_prefetch.items():
|
||||
if self._ocsp_refresh_if_needed(pf):
|
||||
# Save the status to pluginstorage
|
||||
self._ocsp_prefetch_save(pf["cert_path"], pf["chain_path"])
|
||||
|
||||
|
||||
AutoHSTSEnhancement.register(ApacheConfigurator) # pylint: disable=no-member
|
||||
|
||||
@@ -61,3 +61,9 @@ AUTOHSTS_FREQ = 172800
|
||||
MANAGED_COMMENT = "DO NOT REMOVE - Managed by Certbot"
|
||||
MANAGED_COMMENT_ID = MANAGED_COMMENT+", VirtualHost id: {0}"
|
||||
"""Managed by Certbot comments and the VirtualHost identification template"""
|
||||
|
||||
OCSP_APACHE_TTL = 432000
|
||||
"""Apache TTL for OCSP response: 5 days"""
|
||||
|
||||
OCSP_INTERNAL_TTL = 86400
|
||||
"""Internal TTL for OCSP response: 1 day"""
|
||||
|
||||
@@ -12,6 +12,8 @@ from certbot import util
|
||||
from certbot_apache import apache_util
|
||||
from certbot_apache import configurator
|
||||
|
||||
from certbot.plugins.enhancements import OCSPPrefetchEnhancement
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@zope.interface.provider(interfaces.IPluginFactory)
|
||||
@@ -142,3 +144,6 @@ class DebianConfigurator(configurator.ApacheConfigurator):
|
||||
self.reverter.register_undo_command(
|
||||
temp, [self.option("dismod"), "-f", mod_name])
|
||||
util.run_script([self.option("enmod"), mod_name])
|
||||
|
||||
|
||||
OCSPPrefetchEnhancement.register(DebianConfigurator) # pylint: disable=no-member
|
||||
|
||||
327
certbot-apache/certbot_apache/tests/ocsp_prefetch_test.py
Normal file
327
certbot-apache/certbot_apache/tests/ocsp_prefetch_test.py
Normal file
@@ -0,0 +1,327 @@
|
||||
"""Test for certbot_apache.configurator OCSP Prefetching functionality"""
|
||||
import base64
|
||||
import json
|
||||
import os
|
||||
import unittest
|
||||
import mock
|
||||
# six is used in mock.patch()
|
||||
import six # pylint: disable=unused-import
|
||||
import sys
|
||||
|
||||
from acme.magic_typing import Dict, List, Set, Union # pylint: disable=unused-import, no-name-in-module
|
||||
|
||||
from certbot import errors
|
||||
from certbot_apache.tests import util
|
||||
|
||||
|
||||
class MockDBM(object):
|
||||
# pylint: disable=missing-docstring
|
||||
"""Main mock DBM class for Py3 dbm module"""
|
||||
def __init__(self):
|
||||
self.ndbm = Mockdbm_impl()
|
||||
|
||||
|
||||
class Mockdbm_impl(object):
|
||||
"""Mock dbm implementation that satisfies both bsddb and dbm interfaces"""
|
||||
# pylint: disable=missing-docstring
|
||||
|
||||
def __init__(self):
|
||||
self.library = 'Berkeley DB'
|
||||
self.name = 'ndbm'
|
||||
|
||||
def open(self, path, mode):
|
||||
return Mockdb(path, mode)
|
||||
|
||||
def hashopen(self, path, mode):
|
||||
return Mockdb(path, mode)
|
||||
|
||||
|
||||
class Mockdb(object):
|
||||
"""Mock dbm.db for both bsddb and dbm databases"""
|
||||
# pylint: disable=missing-docstring
|
||||
def __init__(self, path, mode):
|
||||
self._data = dict() # type: Dict[str, str]
|
||||
if mode == "r" or mode == "w":
|
||||
if not path.endswith(".db"):
|
||||
path = path+".db"
|
||||
with open(path, 'r') as fh:
|
||||
try:
|
||||
self._data = json.loads(fh.read())
|
||||
except Exception: # pylint: disable=broad-except
|
||||
self._data = dict()
|
||||
self.path = path
|
||||
self.mode = mode
|
||||
|
||||
def __setitem__(self, key, item):
|
||||
bkey = base64.b64encode(key)
|
||||
bitem = base64.b64encode(item)
|
||||
self._data[bkey.decode()] = bitem.decode()
|
||||
|
||||
def __getitem__(self, key):
|
||||
bkey = base64.b64encode(key)
|
||||
return base64.b64decode(self._data[bkey.decode()])
|
||||
|
||||
def keys(self):
|
||||
return [base64.b64decode(k) for k in self._data.keys()]
|
||||
|
||||
def sync(self):
|
||||
return
|
||||
|
||||
def close(self):
|
||||
with open(self.path, 'w') as fh:
|
||||
fh.write(json.dumps(self._data))
|
||||
|
||||
|
||||
class OCSPPrefetchTest(util.ApacheTest):
|
||||
"""Tests for OCSP Prefetch feature"""
|
||||
# pylint: disable=protected-access
|
||||
|
||||
def setUp(self): # pylint: disable=arguments-differ
|
||||
super(OCSPPrefetchTest, self).setUp()
|
||||
|
||||
self.config = util.get_apache_configurator(
|
||||
self.config_path, self.vhost_path, self.config_dir, self.work_dir,
|
||||
os_info="debian")
|
||||
|
||||
self.lineage = mock.MagicMock(cert_path="cert", chain_path="chain")
|
||||
self.config.parser.modules.add("headers_module")
|
||||
self.config.parser.modules.add("mod_headers.c")
|
||||
self.config.parser.modules.add("ssl_module")
|
||||
self.config.parser.modules.add("mod_ssl.c")
|
||||
self.config.parser.modules.add("socache_dbm_module")
|
||||
self.config.parser.modules.add("mod_socache_dbm.c")
|
||||
|
||||
self.vh_truth = util.get_vh_truth(
|
||||
self.temp_dir, "debian_apache_2_4/multiple_vhosts")
|
||||
self.config._ensure_ocsp_dirs()
|
||||
self.db_path = os.path.join(self.config_dir, "ocsp", "ocsp_cache")
|
||||
self.db_fullpath = self.db_path + ".db"
|
||||
|
||||
def _call_mocked(self, func, *args, **kwargs):
|
||||
"""Helper method to call functins with mock stack"""
|
||||
|
||||
db_fullpath = self.db_path + ".db"
|
||||
def mock_restart():
|
||||
"""Mock ApacheConfigurator.restart that creates the dbm file"""
|
||||
# Mock the Apache dbm file creation
|
||||
open(db_fullpath, 'a').close()
|
||||
|
||||
ver_path = "certbot_apache.configurator.ApacheConfigurator.get_version"
|
||||
res_path = "certbot_apache.configurator.ApacheConfigurator.restart"
|
||||
cry_path = "certbot.crypto_util.cert_sha1_fingerprint"
|
||||
|
||||
with mock.patch(ver_path) as mock_ver:
|
||||
mock_ver.return_value = (2, 4, 10)
|
||||
with mock.patch(cry_path) as mock_cry:
|
||||
mock_cry.return_value = b'j\x056\x1f\xfa\x08B\xe8D\xa1Bn\xeb*A\xebWx\xdd\xfe'
|
||||
with mock.patch(res_path, side_effect=mock_restart):
|
||||
return func(*args, **kwargs)
|
||||
|
||||
def call_mocked_py2(self, func, *args, **kwargs):
|
||||
"""Calls methods with imports mocked to suit Py2 environment"""
|
||||
if 'dbm' in sys.modules.keys():
|
||||
sys.modules['dbm'] = None
|
||||
sys.modules['bsddb'] = Mockdbm_impl()
|
||||
return self._call_mocked(func, *args, **kwargs)
|
||||
|
||||
def call_mocked_py3(self, func, *args, **kwargs):
|
||||
"""Calls methods with imports mocked to suit Py3 environment"""
|
||||
real_import = six.moves.builtins.__import__
|
||||
|
||||
def mock_import(*args, **kwargs):
|
||||
"""Mock import to raise ImportError for Py2 specific module to make
|
||||
ApacheConfigurator pick the correct one for Python3 regardless of the
|
||||
python version the tests are running under."""
|
||||
if args[0] == "bsddb":
|
||||
raise ImportError
|
||||
return real_import(*args, **kwargs)
|
||||
|
||||
with mock.patch('six.moves.builtins.__import__', side_effect=mock_import):
|
||||
sys.modules['dbm'] = MockDBM()
|
||||
return self._call_mocked(func, *args, **kwargs)
|
||||
|
||||
@mock.patch("certbot_apache.override_debian.DebianConfigurator.enable_mod")
|
||||
def test_ocsp_prefetch_enable_mods(self, mock_enable):
|
||||
self.config.parser.modules.discard("socache_dbm_module")
|
||||
self.config.parser.modules.discard("mod_socache_dbm.c")
|
||||
self.config.parser.modules.discard("headers_module")
|
||||
self.config.parser.modules.discard("mod_header.c")
|
||||
|
||||
ref_path = "certbot_apache.configurator.ApacheConfigurator._ocsp_refresh"
|
||||
with mock.patch(ref_path):
|
||||
self.call_mocked_py2(self.config.enable_ocsp_prefetch,
|
||||
self.lineage,
|
||||
["ocspvhost.com"])
|
||||
self.assertTrue(mock_enable.called)
|
||||
self.assertEquals(len(self.config._ocsp_prefetch), 1)
|
||||
|
||||
@mock.patch("certbot_apache.override_debian.DebianConfigurator.enable_mod")
|
||||
def test_ocsp_prefetch_enable_error(self, _mock_enable):
|
||||
ref_path = "certbot_apache.configurator.ApacheConfigurator._ocsp_refresh"
|
||||
self.config.recovery_routine = mock.MagicMock()
|
||||
with mock.patch(ref_path, side_effect=errors.PluginError("failed")):
|
||||
self.assertRaises(errors.PluginError,
|
||||
self.call_mocked_py2,
|
||||
self.config.enable_ocsp_prefetch,
|
||||
self.lineage,
|
||||
["ocspvhost.com"])
|
||||
self.assertTrue(self.config.recovery_routine.called)
|
||||
|
||||
@mock.patch("certbot_apache.constants.OCSP_INTERNAL_TTL", 0)
|
||||
def test_ocsp_prefetch_refresh(self):
|
||||
def ocsp_req_mock(workfile):
|
||||
"""Method to mock the OCSP request and write response to file"""
|
||||
with open(workfile, 'w') as fh:
|
||||
fh.write("MOCKRESPONSE")
|
||||
return True
|
||||
|
||||
ocsp_path = "certbot.ocsp.OCSPResponseHandler.ocsp_request_to_file"
|
||||
with mock.patch(ocsp_path, side_effect=ocsp_req_mock):
|
||||
self.call_mocked_py2(self.config.enable_ocsp_prefetch,
|
||||
self.lineage,
|
||||
["ocspvhost.com"])
|
||||
odbm = self.config._ocsp_dbm_open(self.db_path)
|
||||
self.assertEquals(len(odbm.keys()), 1)
|
||||
# The actual response data is prepended by Apache timestamp
|
||||
self.assertTrue(odbm[list(odbm.keys())[0]].endswith(b'MOCKRESPONSE'))
|
||||
self.config._ocsp_dbm_close(odbm)
|
||||
|
||||
with mock.patch(ocsp_path, side_effect=ocsp_req_mock) as mock_ocsp:
|
||||
self.call_mocked_py2(self.config.update_ocsp_prefetch, None)
|
||||
self.assertTrue(mock_ocsp.called)
|
||||
|
||||
def test_ocsp_prefetch_refresh_noop(self):
|
||||
def ocsp_req_mock(workfile):
|
||||
"""Method to mock the OCSP request and write response to file"""
|
||||
with open(workfile, 'w') as fh:
|
||||
fh.write("MOCKRESPONSE")
|
||||
return True
|
||||
|
||||
ocsp_path = "certbot.ocsp.OCSPResponseHandler.ocsp_request_to_file"
|
||||
with mock.patch(ocsp_path, side_effect=ocsp_req_mock):
|
||||
self.call_mocked_py2(self.config.enable_ocsp_prefetch,
|
||||
self.lineage,
|
||||
["ocspvhost.com"])
|
||||
self.assertEquals(len(self.config._ocsp_prefetch), 1)
|
||||
refresh_path = "certbot_apache.configurator.ApacheConfigurator._ocsp_refresh"
|
||||
with mock.patch(refresh_path) as mock_refresh:
|
||||
self.call_mocked_py2(self.config.update_ocsp_prefetch, None)
|
||||
self.assertFalse(mock_refresh.called)
|
||||
|
||||
@mock.patch("certbot_apache.configurator.ApacheConfigurator.config_test")
|
||||
def test_ocsp_prefetch_backup_db(self, _mock_test):
|
||||
def ocsp_del_db():
|
||||
"""Side effect of _reload() that deletes the DBM file, like Apache
|
||||
does when restarting"""
|
||||
os.remove(self.db_fullpath)
|
||||
self.assertFalse(os.path.isfile(self.db_fullpath))
|
||||
|
||||
# Make sure that the db file exists
|
||||
open(self.db_fullpath, 'a').close()
|
||||
odbm = self.call_mocked_py2(self.config._ocsp_dbm_open, self.db_path)
|
||||
odbm[b'mock_key'] = b'mock_value'
|
||||
self.config._ocsp_dbm_close(odbm)
|
||||
|
||||
# Mock OCSP prefetch dict to signify that there should be a db
|
||||
self.config._ocsp_prefetch = {"mock": "value"}
|
||||
rel_path = "certbot_apache.configurator.ApacheConfigurator._reload"
|
||||
with mock.patch(rel_path, side_effect=ocsp_del_db):
|
||||
self.config.restart()
|
||||
|
||||
odbm = self.config._ocsp_dbm_open(self.db_path)
|
||||
self.assertEquals(odbm[b'mock_key'], b'mock_value')
|
||||
self.config._ocsp_dbm_close(odbm)
|
||||
|
||||
@mock.patch("certbot_apache.configurator.ApacheConfigurator.config_test")
|
||||
@mock.patch("certbot_apache.configurator.ApacheConfigurator._reload")
|
||||
def test_ocsp_prefetch_backup_db_error(self, _mock_reload, _mock_test):
|
||||
log_path = "certbot_apache.configurator.logger.debug"
|
||||
log_string = "Encountered an issue while trying to backup OCSP dbm file"
|
||||
log_string2 = "Encountered an issue when trying to restore OCSP dbm file"
|
||||
self.config._ocsp_prefetch = {"mock": "value"}
|
||||
with mock.patch("shutil.copy2", side_effect=IOError):
|
||||
with mock.patch(log_path) as mock_log:
|
||||
self.config.restart()
|
||||
self.assertTrue(mock_log.called)
|
||||
self.assertEquals(mock_log.call_count, 2)
|
||||
self.assertTrue(log_string in mock_log.call_args_list[0][0][0])
|
||||
self.assertTrue(log_string2 in mock_log.call_args_list[1][0][0])
|
||||
|
||||
@mock.patch("certbot_apache.configurator.ApacheConfigurator.restart")
|
||||
def test_ocsp_prefetch_refresh_fail(self, _mock_restart):
|
||||
ocsp_path = "certbot.ocsp.OCSPResponseHandler.ocsp_request_to_file"
|
||||
log_path = "certbot_apache.configurator.logger.warning"
|
||||
with mock.patch(ocsp_path) as mock_ocsp:
|
||||
mock_ocsp.return_value = False
|
||||
with mock.patch(log_path) as mock_log:
|
||||
self.call_mocked_py2(self.config.enable_ocsp_prefetch,
|
||||
self.lineage,
|
||||
["ocspvhost.com"])
|
||||
self.assertTrue(mock_log.called)
|
||||
self.assertTrue(
|
||||
"trying to prefetch OCSP" in mock_log.call_args[0][0])
|
||||
|
||||
@mock.patch("certbot_apache.configurator.ApacheConfigurator._ocsp_refresh_if_needed")
|
||||
def test_ocsp_prefetch_update_noop(self, mock_refresh):
|
||||
self.config.update_ocsp_prefetch(None)
|
||||
self.assertFalse(mock_refresh.called)
|
||||
|
||||
def test_ocsp_prefetch_preflight_check_noerror(self):
|
||||
self.call_mocked_py2(self.config._ensure_ocsp_prefetch_compatibility)
|
||||
self.call_mocked_py3(self.config._ensure_ocsp_prefetch_compatibility)
|
||||
mockdbm_path = "certbot_apache.tests.ocsp_prefetch_test.Mockdbm_impl"
|
||||
with mock.patch(mockdbm_path) as mock_dbm:
|
||||
mock_dbm.library = 'Not Berkeley DB'
|
||||
self.assertRaises(errors.NotSupportedError,
|
||||
self.call_mocked_py3,
|
||||
self.config._ensure_ocsp_prefetch_compatibility)
|
||||
|
||||
def test_ocsp_prefetch_open_dbm_no_file(self):
|
||||
open(self.db_fullpath, 'a').close()
|
||||
db_not_exists = self.db_path+"nonsense"
|
||||
self.call_mocked_py2(self.config._ocsp_dbm_open, self.db_path)
|
||||
self.assertRaises(errors.PluginError,
|
||||
self.call_mocked_py2, self.config._ocsp_dbm_open, db_not_exists)
|
||||
|
||||
def test_ocsp_prefetch_py2_open_file_error(self):
|
||||
open(self.db_fullpath, 'a').close()
|
||||
mock_db = mock.MagicMock()
|
||||
mock_db.hashopen.side_effect = Exception("error")
|
||||
sys.modules["bsddb"] = mock_db
|
||||
self.assertRaises(errors.PluginError,
|
||||
self.config._ocsp_dbm_open,
|
||||
self.db_path)
|
||||
|
||||
def test_ocsp_prefetch_py3_open_file_error(self):
|
||||
open(self.db_fullpath, 'a').close()
|
||||
mock_db = mock.MagicMock()
|
||||
mock_db.ndbm.open.side_effect = Exception("error")
|
||||
sys.modules["dbm"] = mock_db
|
||||
sys.modules["bsddb"] = None
|
||||
self.assertRaises(errors.PluginError,
|
||||
self.config._ocsp_dbm_open,
|
||||
self.db_path)
|
||||
|
||||
def test_ocsp_prefetch_open_close_py2_noerror(self):
|
||||
expected_val = b'whatever_value'
|
||||
open(self.db_fullpath, 'a').close()
|
||||
db = self.call_mocked_py2(
|
||||
self.config._ocsp_dbm_open, self.db_path)
|
||||
db[b'key'] = expected_val
|
||||
self.call_mocked_py2(self.config._ocsp_dbm_close, db)
|
||||
db2 = self.call_mocked_py2(self.config._ocsp_dbm_open, self.db_path)
|
||||
self.assertEquals(db2[b'key'], expected_val)
|
||||
|
||||
def test_ocsp_prefetch_open_close_py3_noerror(self):
|
||||
expected_val = b'whatever_value'
|
||||
open(self.db_fullpath, 'a').close()
|
||||
db = self.call_mocked_py3(
|
||||
self.config._ocsp_dbm_open, self.db_path)
|
||||
db[b'key'] = expected_val
|
||||
self.call_mocked_py3(self.config._ocsp_dbm_close, db)
|
||||
db2 = self.call_mocked_py3(self.config._ocsp_dbm_open, self.db_path)
|
||||
self.assertEquals(db2[b'key'], expected_val)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main() # pragma: no cover
|
||||
@@ -60,6 +60,7 @@ CLI_DEFAULTS = dict(
|
||||
redirect=None,
|
||||
auto_hsts=False,
|
||||
hsts=None,
|
||||
ocsp_prefetch=False,
|
||||
uir=None,
|
||||
staple=None,
|
||||
strict_permissions=False,
|
||||
|
||||
@@ -14,6 +14,7 @@ import six
|
||||
import zope.component
|
||||
from cryptography.exceptions import InvalidSignature
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import hashes # type: ignore
|
||||
from cryptography.hazmat.primitives.asymmetric.ec import ECDSA
|
||||
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePublicKey
|
||||
from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15
|
||||
@@ -225,6 +226,18 @@ def verify_renewable_cert(renewable_cert):
|
||||
verify_cert_matches_priv_key(renewable_cert.cert, renewable_cert.privkey)
|
||||
|
||||
|
||||
def load_cert(cert_path):
|
||||
"""Reads the certificate PEM file and returns a cryptography.x509 object
|
||||
|
||||
:param str cert_path: Path to the certificate
|
||||
:rtype `cryptography.x509`:
|
||||
:returns: x509 certificate object
|
||||
"""
|
||||
with open(cert_path, 'rb') as fh:
|
||||
cert_pem = fh.read()
|
||||
return x509.load_pem_x509_certificate(cert_pem, default_backend())
|
||||
|
||||
|
||||
def verify_renewable_cert_sig(renewable_cert):
|
||||
""" Verifies the signature of a `.storage.RenewableCert` object.
|
||||
|
||||
@@ -235,8 +248,7 @@ def verify_renewable_cert_sig(renewable_cert):
|
||||
try:
|
||||
with open(renewable_cert.chain, 'rb') as chain_file: # type: IO[bytes]
|
||||
chain = x509.load_pem_x509_certificate(chain_file.read(), default_backend())
|
||||
with open(renewable_cert.cert, 'rb') as cert_file: # type: IO[bytes]
|
||||
cert = x509.load_pem_x509_certificate(cert_file.read(), default_backend())
|
||||
cert = load_cert(renewable_cert.cert)
|
||||
pk = chain.public_key()
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore")
|
||||
@@ -459,6 +471,13 @@ def sha256sum(filename):
|
||||
sha256.update(f.read())
|
||||
return sha256.hexdigest()
|
||||
|
||||
|
||||
def cert_sha1_fingerprint(cert_path):
|
||||
"""Get sha1 digest of the certificate fingerprint"""
|
||||
cert = load_cert(cert_path)
|
||||
return cert.fingerprint(hashes.SHA1())
|
||||
|
||||
|
||||
def cert_and_chain_from_fullchain(fullchain_pem):
|
||||
"""Split fullchain_pem into cert_pem and chain_pem
|
||||
|
||||
|
||||
@@ -108,3 +108,12 @@ class ConfigurationError(Error):
|
||||
|
||||
class MissingCommandlineFlag(Error):
|
||||
"""A command line argument was missing in noninteractive usage"""
|
||||
|
||||
# OCSP errors:
|
||||
|
||||
class OCSPRevokedError(Error):
|
||||
"""Certificate is revoked based on the OCSP response"""
|
||||
|
||||
|
||||
class OCSPRequestError(Error):
|
||||
"""Error in OCSP request"""
|
||||
|
||||
@@ -904,6 +904,8 @@ def enhance(config, plugins):
|
||||
lineage = cert_manager.lineage_for_certname(config, config.certname)
|
||||
if not config.chain_path:
|
||||
config.chain_path = lineage.chain_path
|
||||
if not config.cert_path:
|
||||
config.cert_path = lineage.cert_path
|
||||
if oldstyle_enh:
|
||||
le_client = _init_le_client(config, authenticator=None, installer=installer)
|
||||
le_client.enhance_config(domains, config.chain_path, ask_redirect=False)
|
||||
|
||||
126
certbot/ocsp.py
126
certbot/ocsp.py
@@ -1,15 +1,113 @@
|
||||
"""Tools for checking certificate revocation."""
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
|
||||
from subprocess import Popen, PIPE
|
||||
|
||||
from certbot import crypto_util
|
||||
from certbot import errors
|
||||
from certbot import util
|
||||
|
||||
from cryptography.x509.oid import ExtensionOID, AuthorityInformationAccessOID
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class RevocationChecker(object):
|
||||
class OCSPBase(object):
|
||||
"""Base class for OCSP request operations"""
|
||||
|
||||
def determine_ocsp_server(self, cert_path):
|
||||
"""Extract the OCSP server host from a certificate.
|
||||
|
||||
:param str cert_path: Path to the cert we're checking OCSP for
|
||||
:rtype tuple:
|
||||
:returns: (OCSP server URL or None, OCSP server host or None)
|
||||
|
||||
"""
|
||||
try:
|
||||
url = self._ocsp_host_from_cert(cert_path)
|
||||
if url:
|
||||
url = url.strip()
|
||||
host = url.partition("://")[2].rstrip("/")
|
||||
except IOError:
|
||||
url = host = None
|
||||
|
||||
if host:
|
||||
return url, host
|
||||
else:
|
||||
logger.info("Cannot process OCSP host for cert at %s", cert_path)
|
||||
return None, None
|
||||
|
||||
def _ocsp_host_from_cert(self, cert_path):
|
||||
"""Helper method for determine_ocsp_server to read the actual OCSP
|
||||
server information from a certificate file"""
|
||||
cert = crypto_util.load_cert(cert_path)
|
||||
ocsp_authinfo = cert.extensions.get_extension_for_oid(
|
||||
ExtensionOID.AUTHORITY_INFORMATION_ACCESS)
|
||||
for obj in ocsp_authinfo.value:
|
||||
if obj.access_method == AuthorityInformationAccessOID.OCSP:
|
||||
return obj.access_location.value
|
||||
|
||||
def _request_success(self, response):
|
||||
"""Checks that the OCSP response was successful from the text output"""
|
||||
pattern = re.compile(r"OCSP Response Status: (.*)\n")
|
||||
return "successful (0x0)" in pattern.findall(response)
|
||||
|
||||
def _response_successful(self, response, cert_path):
|
||||
"""Checks that the certificate is not revoked"""
|
||||
pattern = re.compile(r"{0}: (.*)\n".format(cert_path))
|
||||
if "revoked" in pattern.findall(response):
|
||||
raise errors.OCSPRevokedError("Certificate is revoked")
|
||||
return "good" in pattern.findall(response)
|
||||
|
||||
|
||||
class OCSPResponseHandler(OCSPBase):
|
||||
"""Class for handling OCSP requests"""
|
||||
def __init__(self, cert_path, chain_path):
|
||||
self.cert_path = cert_path
|
||||
self.chain_path = chain_path
|
||||
|
||||
def ocsp_request_to_file(self, filepath):
|
||||
"""Make OCSP request and save the response to a file.
|
||||
|
||||
:param str filepath: Path to save the OCSP response to.
|
||||
|
||||
:returns: True if the response was successfully saved
|
||||
:rtype: bool
|
||||
|
||||
:raises errors.OCSPRevokedError: If certificate is revoked.
|
||||
|
||||
"""
|
||||
|
||||
url, _ = self.determine_ocsp_server(self.cert_path)
|
||||
cmd = ["openssl", "ocsp",
|
||||
"-no_nonce",
|
||||
"-issuer", self.chain_path,
|
||||
"-cert", self.cert_path,
|
||||
"-url", url,
|
||||
"-CAfile", self.chain_path,
|
||||
"-verify_other", self.chain_path,
|
||||
"-trust_other",
|
||||
"-respout", filepath,
|
||||
"-text"]
|
||||
try:
|
||||
output, _ = util.run_script(cmd, log=logger.debug)
|
||||
except errors.SubprocessError:
|
||||
logger.info("OCSP check failed for %s (are we offline?)", self.cert_path)
|
||||
return False
|
||||
|
||||
if not self._request_success(output):
|
||||
raise errors.OCSPRequestError("OCSP request returned an error")
|
||||
try:
|
||||
if self._response_successful(output, self.cert_path):
|
||||
return os.path.isfile(filepath)
|
||||
except errors.OCSPRevokedError:
|
||||
logger.warning("Certificate %s is revoked.", self.cert_path)
|
||||
raise
|
||||
return False
|
||||
|
||||
|
||||
class RevocationChecker(OCSPBase):
|
||||
"This class figures out OCSP checking on this system, and performs it."
|
||||
|
||||
def __init__(self):
|
||||
@@ -20,7 +118,7 @@ class RevocationChecker(object):
|
||||
self.broken = True
|
||||
return
|
||||
|
||||
# New versions of openssl want -header var=val, old ones want -header var val
|
||||
# New versions of openssl want -header var=val, old ones want -header var val
|
||||
test_host_format = Popen(["openssl", "ocsp", "-header", "var", "val"],
|
||||
stdout=PIPE, stderr=PIPE, universal_newlines=True)
|
||||
_out, err = test_host_format.communicate()
|
||||
@@ -69,30 +167,6 @@ class RevocationChecker(object):
|
||||
return _translate_ocsp_query(cert_path, output, err)
|
||||
|
||||
|
||||
def determine_ocsp_server(self, cert_path):
|
||||
"""Extract the OCSP server host from a certificate.
|
||||
|
||||
:param str cert_path: Path to the cert we're checking OCSP for
|
||||
:rtype tuple:
|
||||
:returns: (OCSP server URL or None, OCSP server host or None)
|
||||
|
||||
"""
|
||||
try:
|
||||
url, _err = util.run_script(
|
||||
["openssl", "x509", "-in", cert_path, "-noout", "-ocsp_uri"],
|
||||
log=logger.debug)
|
||||
except errors.SubprocessError:
|
||||
logger.info("Cannot extract OCSP URI from %s", cert_path)
|
||||
return None, None
|
||||
|
||||
url = url.rstrip()
|
||||
host = url.partition("://")[2].rstrip("/")
|
||||
if host:
|
||||
return url, host
|
||||
else:
|
||||
logger.info("Cannot process OCSP host from URL (%s) in cert at %s", url, cert_path)
|
||||
return None, None
|
||||
|
||||
def _translate_ocsp_query(cert_path, ocsp_output, ocsp_errors):
|
||||
"""Parse openssl's weird output to work out what it means."""
|
||||
|
||||
|
||||
@@ -143,6 +143,60 @@ class AutoHSTSEnhancement(object):
|
||||
:type domains: str
|
||||
"""
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class OCSPPrefetchEnhancement(object):
|
||||
"""
|
||||
Enhancement interface that installer plugins can implement in order to
|
||||
provide functionality that prefetches an OCSP response and stores it
|
||||
to be served for incoming client requests.
|
||||
|
||||
The plugins implementing new style enhancements are responsible of handling
|
||||
the saving of configuration checkpoints as well as calling possible restarts
|
||||
of managed software themselves. For update_ocsp_prefetch method, the installer
|
||||
may have to call prepare() to finalize the plugin initialization.
|
||||
|
||||
Methods:
|
||||
enable_ocsp_prefetch is called when the domain is configured to
|
||||
serve OCSP responses using mechanism called OCSP Stapling.
|
||||
|
||||
update_ocsp_prefetch is called every time when Certbot is run using 'renew'
|
||||
verb. Certbot should proceed to make a request to the OCSP server in order
|
||||
to fetch an OCSP response and to store the recieved response, if valid.
|
||||
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def update_ocsp_prefetch(self, lineage, *args, **kwargs):
|
||||
"""
|
||||
Gets called for each lineage every time Certbot is run with 'renew' verb.
|
||||
Implementation of this method should fetch a fresh OCSP response and if
|
||||
valid, store it to be served for connecting clients.
|
||||
|
||||
:param lineage: Certificate lineage object
|
||||
:type lineage: certbot.storage.RenewableCert
|
||||
|
||||
.. note:: prepare() method inherited from `interfaces.IPlugin` might need
|
||||
to be called manually within implementation of this interface method
|
||||
to finalize the plugin initialization.
|
||||
"""
|
||||
|
||||
|
||||
@abc.abstractmethod
|
||||
def enable_ocsp_prefetch(self, lineage, domains, *args, **kwargs):
|
||||
"""
|
||||
Enables the OCSP enhancement, enabling OCSP Stapling functionality for
|
||||
the controlled software, and sets it up for prefetching the responses
|
||||
over the subsequent runs of Certbot renew.
|
||||
|
||||
:param lineage: Certificate lineage object
|
||||
:type lineage: certbot.storage.RenewableCert
|
||||
|
||||
:param domains: List of domains in certificate to enhance
|
||||
:type domains: str
|
||||
"""
|
||||
|
||||
|
||||
# This is used to configure internal new style enhancements in Certbot. These
|
||||
# enhancement interfaces need to be defined in this file. Please do not modify
|
||||
# this list from plugin code.
|
||||
@@ -160,5 +214,19 @@ _INDEX = [
|
||||
"updater_function": "update_autohsts",
|
||||
"deployer_function": "deploy_autohsts",
|
||||
"enable_function": "enable_autohsts"
|
||||
},
|
||||
{
|
||||
"name": "OCSPPrefetch",
|
||||
"cli_help": "Prefetch OCSP responses for certificates in order to be "+
|
||||
"able to serve connecting clients fresh staple immediately",
|
||||
"cli_flag": "--ocsp-prefetch",
|
||||
"cli_flag_default": constants.CLI_DEFAULTS["ocsp_prefetch"],
|
||||
"cli_groups": ["security", "enhance"],
|
||||
"cli_dest": "ocsp_prefetch",
|
||||
"cli_action": "store_true",
|
||||
"class": OCSPPrefetchEnhancement,
|
||||
"updater_function": "update_ocsp_prefetch",
|
||||
"deployer_function": None,
|
||||
"enable_function": "enable_ocsp_prefetch"
|
||||
}
|
||||
] # type: List[Dict[str, Any]]
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
"""Tests for certbot.crypto_util."""
|
||||
import binascii
|
||||
import logging
|
||||
import os
|
||||
import unittest
|
||||
@@ -383,6 +384,15 @@ class Sha256sumTest(unittest.TestCase):
|
||||
'914ffed8daf9e2c99d90ac95c77d54f32cbd556672facac380f0c063498df84e')
|
||||
|
||||
|
||||
class Sha1FingerprintTest(unittest.TestCase):
|
||||
"""Tests certbot.crypto_util.cert_sha1_fingerprint"""
|
||||
|
||||
def test_sha1fingerprint(self):
|
||||
from certbot.crypto_util import cert_sha1_fingerprint
|
||||
self.assertEqual(cert_sha1_fingerprint(CERT_PATH),
|
||||
binascii.unhexlify("09f8ce01450d288467c3326ac0457e351939c72e"))
|
||||
|
||||
|
||||
class CertAndChainFromFullchainTest(unittest.TestCase):
|
||||
"""Tests for certbot.crypto_util.cert_and_chain_from_fullchain"""
|
||||
|
||||
|
||||
@@ -1,11 +1,16 @@
|
||||
"""Tests for ocsp.py"""
|
||||
# pylint: disable=protected-access
|
||||
|
||||
import mock
|
||||
import os
|
||||
import unittest
|
||||
|
||||
import mock
|
||||
|
||||
from certbot import errors
|
||||
from certbot import ocsp
|
||||
|
||||
from certbot.tests import util
|
||||
|
||||
from cryptography.x509.oid import AuthorityInformationAccessOID
|
||||
|
||||
out = """Missing = in header key=value
|
||||
ocsp: Use -help for summary.
|
||||
@@ -15,7 +20,6 @@ class OCSPTest(unittest.TestCase):
|
||||
|
||||
|
||||
def setUp(self):
|
||||
from certbot import ocsp
|
||||
with mock.patch('certbot.ocsp.Popen') as mock_popen:
|
||||
with mock.patch('certbot.util.exe_exists') as mock_exists:
|
||||
mock_communicate = mock.MagicMock()
|
||||
@@ -27,6 +31,18 @@ class OCSPTest(unittest.TestCase):
|
||||
def tearDown(self):
|
||||
pass
|
||||
|
||||
def _call_mock_cert(self, func, *args, **kwargs):
|
||||
"""Helper method that uses mocked certificate object for testing"""
|
||||
|
||||
ocsp_ext = mock.MagicMock()
|
||||
ocsp_ext.access_method = AuthorityInformationAccessOID.OCSP
|
||||
ocsp_ext.access_location.value = "http://ocsp.stg-int-x1.letsencrypt.org/"
|
||||
mock_cert = mock.MagicMock()
|
||||
mock_cert.extensions.get_extension_for_oid.return_value = mock.MagicMock(value=[ocsp_ext])
|
||||
with mock.patch('certbot.crypto_util.load_cert') as load_cert:
|
||||
load_cert.return_value = mock_cert
|
||||
return func(*args, **kwargs)
|
||||
|
||||
@mock.patch('certbot.ocsp.logger.info')
|
||||
@mock.patch('certbot.ocsp.Popen')
|
||||
@mock.patch('certbot.util.exe_exists')
|
||||
@@ -36,7 +52,6 @@ class OCSPTest(unittest.TestCase):
|
||||
mock_popen.return_value = mock_communicate
|
||||
mock_exists.return_value = True
|
||||
|
||||
from certbot import ocsp
|
||||
checker = ocsp.RevocationChecker()
|
||||
self.assertEqual(mock_popen.call_count, 1)
|
||||
self.assertEqual(checker.host_args("x"), ["Host=x"])
|
||||
@@ -73,26 +88,20 @@ class OCSPTest(unittest.TestCase):
|
||||
|
||||
|
||||
@mock.patch('certbot.ocsp.logger.info')
|
||||
@mock.patch('certbot.util.run_script')
|
||||
def test_determine_ocsp_server(self, mock_run, mock_info):
|
||||
def test_determine_ocsp_server(self, mock_info):
|
||||
uri = "http://ocsp.stg-int-x1.letsencrypt.org/"
|
||||
host = "ocsp.stg-int-x1.letsencrypt.org"
|
||||
mock_run.return_value = uri, ""
|
||||
self.assertEqual(self.checker.determine_ocsp_server("beep"), (uri, host))
|
||||
mock_run.return_value = "ftp:/" + host + "/", ""
|
||||
self.assertEqual(
|
||||
self._call_mock_cert(self.checker.determine_ocsp_server, "path_to_cert"),
|
||||
(uri, host))
|
||||
self.assertEqual(self.checker.determine_ocsp_server("beep"), (None, None))
|
||||
self.assertEqual(mock_info.call_count, 1)
|
||||
|
||||
c = "confusion"
|
||||
mock_run.side_effect = errors.SubprocessError(c)
|
||||
self.assertEqual(self.checker.determine_ocsp_server("beep"), (None, None))
|
||||
|
||||
@mock.patch('certbot.ocsp.logger')
|
||||
@mock.patch('certbot.util.run_script')
|
||||
def test_translate_ocsp(self, mock_run, mock_log):
|
||||
# pylint: disable=protected-access,star-args
|
||||
mock_run.return_value = openssl_confused
|
||||
from certbot import ocsp
|
||||
self.assertEqual(ocsp._translate_ocsp_query(*openssl_happy), False)
|
||||
self.assertEqual(ocsp._translate_ocsp_query(*openssl_confused), False)
|
||||
self.assertEqual(mock_log.debug.call_count, 1)
|
||||
@@ -112,6 +121,61 @@ class OCSPTest(unittest.TestCase):
|
||||
self.assertEqual(mock_log.info.call_count, 1)
|
||||
|
||||
|
||||
class OCSPResponseHandlerTest(util.TempDirTestCase):
|
||||
"""Tests for certbot.ocsp.OCSPResponseHandler"""
|
||||
|
||||
def setUp(self):
|
||||
super(OCSPResponseHandlerTest, self).setUp()
|
||||
self.handler = ocsp.OCSPResponseHandler("blah.pem", "chainpath")
|
||||
self.response_filep = os.path.join(self.tempdir, "ocsp_response")
|
||||
|
||||
def _call_mocked(self, output, func, *args, **kwargs):
|
||||
"""Helper method to mock subprocess.Popen and reading OCSP url from cert"""
|
||||
# openssl call creates the output file
|
||||
open(self.response_filep, 'w').close()
|
||||
with mock.patch('certbot.util.run_script') as mock_popen:
|
||||
mock_popen.return_value = output, ""
|
||||
with mock.patch('certbot.ocsp.OCSPBase.determine_ocsp_server') as mock_url:
|
||||
mock_url.return_value = "http://ocsp.example.com", ""
|
||||
return func(*args, **kwargs)
|
||||
|
||||
@mock.patch("certbot.ocsp.logger.info")
|
||||
def test_queryfail(self, mock_log):
|
||||
with mock.patch('certbot.util.run_script', side_effect=errors.SubprocessError):
|
||||
with mock.patch('certbot.ocsp.OCSPBase.determine_ocsp_server') as mock_url:
|
||||
mock_url.return_value = "http://ocsp.example.com", ""
|
||||
self.assertFalse(self.handler.ocsp_request_to_file(self.response_filep))
|
||||
self.assertTrue(mock_log.called)
|
||||
self.assertEqual(mock_log.call_args[0][0],
|
||||
'OCSP check failed for %s (are we offline?)')
|
||||
|
||||
def test_queryunsuccessful(self):
|
||||
self.assertRaises(errors.OCSPRequestError,
|
||||
self._call_mocked,
|
||||
"OCSP Response Status: nope",
|
||||
self.handler.ocsp_request_to_file,
|
||||
self.response_filep)
|
||||
|
||||
def test_revoked(self):
|
||||
self.assertRaises(errors.OCSPRevokedError,
|
||||
self._call_mocked,
|
||||
openssl_revoked[1],
|
||||
self.handler.ocsp_request_to_file,
|
||||
self.response_filep)
|
||||
|
||||
def test_nogood(self):
|
||||
self.assertFalse(self._call_mocked(
|
||||
openssl_unknown[1],
|
||||
self.handler.ocsp_request_to_file,
|
||||
self.response_filep))
|
||||
|
||||
def test_success_full(self):
|
||||
self.assertTrue(self._call_mocked(
|
||||
openssl_full_success,
|
||||
self.handler.ocsp_request_to_file,
|
||||
self.response_filep))
|
||||
|
||||
|
||||
# pylint: disable=line-too-long
|
||||
openssl_confused = ("", """
|
||||
/etc/letsencrypt/live/example.org/cert.pem: good
|
||||
@@ -124,6 +188,8 @@ Response Verify Failure
|
||||
""")
|
||||
|
||||
openssl_happy = ("blah.pem", """
|
||||
OCSP Response Data:
|
||||
OCSP Response Status: successful (0x0)
|
||||
blah.pem: good
|
||||
This Update: Dec 20 18:00:00 2016 GMT
|
||||
Next Update: Dec 27 18:00:00 2016 GMT
|
||||
@@ -131,6 +197,8 @@ blah.pem: good
|
||||
"Response verify OK")
|
||||
|
||||
openssl_revoked = ("blah.pem", """
|
||||
OCSP Response Data:
|
||||
OCSP Response Status: successful (0x0)
|
||||
blah.pem: revoked
|
||||
This Update: Dec 20 01:00:00 2016 GMT
|
||||
Next Update: Dec 27 01:00:00 2016 GMT
|
||||
@@ -139,6 +207,8 @@ blah.pem: revoked
|
||||
"""Response verify OK""")
|
||||
|
||||
openssl_unknown = ("blah.pem", """
|
||||
OCSP Response Data:
|
||||
OCSP Response Status: successful (0x0)
|
||||
blah.pem: unknown
|
||||
This Update: Dec 20 18:00:00 2016 GMT
|
||||
Next Update: Dec 27 18:00:00 2016 GMT
|
||||
@@ -165,5 +235,52 @@ revoked
|
||||
""",
|
||||
"""Response verify OK""")
|
||||
|
||||
openssl_full_success = """
|
||||
OCSP Request Data:
|
||||
Version: 1 (0x0)
|
||||
Requestor List:
|
||||
Certificate ID:
|
||||
Hash Algorithm: sha1
|
||||
Issuer Name Hash: C29C130A07D1FF36475F8766B701C13205DF6527
|
||||
Issuer Key Hash: C0CC0346B95820CC5C7270F3E12ECB20A6F5683A
|
||||
Serial Number: FA185757440D467FCCBA5D832824354BD74C
|
||||
OCSP Response Data:
|
||||
OCSP Response Status: successful (0x0)
|
||||
Response Type: Basic OCSP Response
|
||||
Version: 1 (0x0)
|
||||
Responder Id: CN = Fake LE Intermediate X1
|
||||
Produced At: Oct 18 09:53:00 2018 GMT
|
||||
Responses:
|
||||
Certificate ID:
|
||||
Hash Algorithm: sha1
|
||||
Issuer Name Hash: C29C130A07D1FF36475F8766B701C13205DF6527
|
||||
Issuer Key Hash: C0CC0346B95820CC5C7270F3E12ECB20A6F5683A
|
||||
Serial Number: FA185757440D467FCCBA5D832824354BD74C
|
||||
Cert Status: good
|
||||
This Update: Oct 18 09:00:00 2018 GMT
|
||||
Next Update: Oct 25 09:00:00 2018 GMT
|
||||
|
||||
Signature Algorithm: sha256WithRSAEncryption
|
||||
a1:15:14:c9:53:e0:5d:3d:fb:79:f6:1e:a4:be:a6:b3:bd:52:
|
||||
59:5e:b0:a9:cb:8e:3b:65:e6:9a:cc:cc:5d:45:64:d9:64:5d:
|
||||
a2:1c:c7:71:aa:94:27:bf:ee:9d:2c:53:70:3b:66:c7:41:d4:
|
||||
78:7e:cb:b7:c7:72:36:aa:c6:d3:a6:50:c6:4a:e4:d4:16:c8:
|
||||
34:26:57:f8:ee:10:d3:ea:2d:6e:2b:e3:54:92:c7:bd:00:84:
|
||||
30:03:cc:62:cd:f4:48:71:2c:1a:3f:0c:b9:a8:42:3e:60:83:
|
||||
dc:c8:27:41:54:e3:f6:5a:a5:b6:00:a4:d4:30:48:e5:bf:d6:
|
||||
55:98:02:a3:95:c9:04:08:af:23:f9:3c:bc:68:57:d5:13:a0:
|
||||
63:2d:14:9f:72:f1:a6:06:28:98:76:26:04:c8:9f:2e:1c:e8:
|
||||
f3:be:44:64:74:9c:8b:72:94:2f:e5:73:bd:38:99:77:b3:fc:
|
||||
bf:10:4e:d4:87:a1:0f:9f:2b:02:fa:6a:eb:67:e7:4c:fc:ef:
|
||||
32:29:e6:f7:8a:ad:56:7b:a7:a7:c0:0e:95:01:46:df:98:1e:
|
||||
4a:2b:72:99:14:96:06:a8:fc:59:c8:9b:3d:e0:e4:4e:8d:f5:
|
||||
aa:90:1a:db:39:44:04:f0:ef:34:6a:90:cb:48:38:fe:ec:34:
|
||||
77:78:97:56
|
||||
Response verify OK
|
||||
blah.pem: good
|
||||
This Update: Oct 18 09:00:00 2018 GMT
|
||||
Next Update: Oct 25 09:00:00 2018 GMT
|
||||
"""
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main() # pragma: no cover
|
||||
|
||||
Reference in New Issue
Block a user