Compare commits

...

57 Commits

Author SHA1 Message Date
Joona Hoikkala
0976176a56 Temporary fix, REVERT 2020-04-30 01:29:52 +03:00
Joona Hoikkala
ae76f2d3b5 Fix the OCSP api call 2020-04-30 01:20:15 +03:00
Joona Hoikkala
839b86871d Move cert_sha1_fingerprint to to internal apache_util 2020-04-22 20:56:48 +03:00
Joona Hoikkala
1e5d13f212 Implement deploy hook for ocsp prefetch functionality 2020-04-22 20:46:10 +03:00
Joona Hoikkala
c3ebf331e9 Fix merge issues 2020-04-22 19:39:05 +03:00
Joona Hoikkala
2d9e9c8aef Merge remote-tracking branch 'origin/master' into ocsp_apache_continued 2020-04-22 19:18:41 +03:00
Joona Hoikkala
741278ef67 Use certificate file path as key for the internal storage and remove revoked and deleted certificates from pool when met 2020-04-16 00:50:19 +03:00
Joona Hoikkala
57cd0c7d81 Address review comments 2020-04-09 02:42:26 +03:00
Joona Hoikkala
895330e009 Use filesystem.replace for atomic move operations 2020-03-04 20:21:00 +02:00
Joona Hoikkala
56cb226bd4 Merge branch 'ocsp_apache_continued' of github.com:certbot/certbot into ocsp_apache_continued 2020-02-26 20:08:58 +02:00
Joona Hoikkala
dc255aeb4f Fix tests and handle PluginError 2020-02-26 20:08:20 +02:00
Joona Hoikkala
b0feb33b9b Fix the backup target path 2020-02-26 18:51:25 +02:00
Joona Hoikkala
a42cf70f71 More review comment fixes 2020-02-19 20:44:37 +02:00
Joona Hoikkala
593ed9c8c4 Update certbot/docs/using.rst
Co-Authored-By: Brad Warren <bmw@users.noreply.github.com>
2020-02-19 18:11:41 +02:00
Joona Hoikkala
d6dafb0a1b Merge branch 'ocsp_apache_continued' of github.com:certbot/certbot into ocsp_apache_continued 2020-02-19 18:07:58 +02:00
Joona Hoikkala
f192cbf12a Address review comments 2020-02-19 18:07:30 +02:00
Joona Hoikkala
5a8032d5cc Update certbot-apache/certbot_apache/_internal/prefetch_ocsp.py
Co-Authored-By: Brad Warren <bmw@users.noreply.github.com>
2020-02-19 18:04:30 +02:00
Joona Hoikkala
62d08e032a Update certbot-apache/setup.py
Co-Authored-By: Brad Warren <bmw@users.noreply.github.com>
2020-02-19 17:14:51 +02:00
Joona Hoikkala
83b73aeb93 Update certbot-apache/setup.py
Co-Authored-By: Brad Warren <bmw@users.noreply.github.com>
2020-02-19 17:14:07 +02:00
Joona Hoikkala
a446e124e4 Fix tests after moving ocsp to public api 2020-02-12 17:55:10 +02:00
Joona Hoikkala
8ddc17fd2c Merge remote-tracking branch 'origin/master' into ocsp_apache_continued 2020-02-12 17:42:41 +02:00
Joona Hoikkala
7e4e0d8bdd Add user guide documentation 2020-02-05 18:20:52 +02:00
Joona Hoikkala
caf2ad2cb1 Add overview documentation of the functionality to .py 2020-02-05 17:00:08 +02:00
Joona Hoikkala
17af868f62 Update certbot-apache/certbot_apache/_internal/apache_util.py
Co-Authored-By: Brad Warren <bmw@users.noreply.github.com>
2020-02-04 22:04:01 +02:00
Joona Hoikkala
4b3dea8be6 Update certbot-apache/certbot_apache/_internal/configurator.py
Co-Authored-By: Brad Warren <bmw@users.noreply.github.com>
2020-02-04 21:58:42 +02:00
Joona Hoikkala
719142e28d Update certbot/certbot/plugins/enhancements.py
Co-Authored-By: Brad Warren <bmw@users.noreply.github.com>
2020-02-04 21:57:56 +02:00
Joona Hoikkala
d7778b0f5e Update certbot/certbot/crypto_util.py
Co-Authored-By: Brad Warren <bmw@users.noreply.github.com>
2020-02-04 21:56:42 +02:00
Joona Hoikkala
590d81c3ae Update certbot/certbot/crypto_util.py
Co-Authored-By: Brad Warren <bmw@users.noreply.github.com>
2020-02-04 21:56:24 +02:00
Joona Hoikkala
6395cc2b48 Copy dbm file to work directory before writing 2020-02-04 20:13:28 +02:00
Joona Hoikkala
1ad23f9db0 Move DBM handling to a context manager 2020-02-04 13:13:04 +02:00
Joona Hoikkala
a5d739f8ff Update certbot/certbot/_internal/ocsp.py
Co-Authored-By: Brad Warren <bmw@users.noreply.github.com>
2020-02-04 10:44:59 +02:00
Joona Hoikkala
d785f8c534 Update certbot/certbot/_internal/ocsp.py
Co-Authored-By: Brad Warren <bmw@users.noreply.github.com>
2020-02-04 10:44:25 +02:00
Joona Hoikkala
975025207f Update certbot/certbot/_internal/ocsp.py
Co-Authored-By: Brad Warren <bmw@users.noreply.github.com>
2020-02-04 10:43:53 +02:00
Joona Hoikkala
a02b092620 Update certbot/certbot/_internal/ocsp.py
Co-Authored-By: Brad Warren <bmw@users.noreply.github.com>
2020-02-04 10:43:12 +02:00
Joona Hoikkala
fd74aba422 Address review comments 2020-02-03 22:18:52 +02:00
Joona Hoikkala
b6ea34c61d Address review comments 2020-01-31 20:06:52 +02:00
Joona Hoikkala
a8a106c325 Small fixes 2020-01-30 16:58:14 +02:00
Joona Hoikkala
4138259c51 Add certbot-apache tests and mypy type hints 2020-01-27 15:10:04 +02:00
Joona Hoikkala
dd9f76c60c Reorder new ocsp functions and add tests 2020-01-27 14:03:24 +02:00
Joona Hoikkala
549061249f Parse producedAt, thisUpdate and nextUpdate values from OCSP response and calculate Apache internal TTL 2020-01-26 23:42:29 +02:00
Joona Hoikkala
53f8ad88db Enable OCSP and revocation checking based on certificate and chain filepaths 2020-01-26 15:42:01 +02:00
Joona Hoikkala
fe0a985228 Call restart() from superclass from OCSPPrefetchMixin 2020-01-24 22:35:22 +02:00
Joona Hoikkala
dfa8b2a2cd Make pip use the local version of certbot-apache for oldest tests 2020-01-24 22:06:19 +02:00
Joona Hoikkala
18dddd1eb5 Fix dependency requirement versions to dev0 2020-01-24 16:35:18 +02:00
Joona Hoikkala
a7f934701f Add test to ensure dbm recovery if restart fails 2020-01-24 16:16:37 +02:00
Joona Hoikkala
8ca967a0f4 Update dependency versions 2020-01-24 16:10:49 +02:00
Joona Hoikkala
a9ce156d9c Restore dbm database if Apache restart fails 2020-01-24 16:06:11 +02:00
Joona Hoikkala
0904062015 Add link to mypy issue in super() init call 2020-01-24 16:01:50 +02:00
Joona Hoikkala
6cfc493a71 Move restart() override and interface registration to OCSPPrefetchMixin 2020-01-24 15:02:25 +02:00
Joona Hoikkala
dad0ca3505 Merge remote-tracking branch 'origin/master' into ocsp_apache_continued 2020-01-22 20:51:30 +02:00
Joona Hoikkala
fa8a68d45f Move the OCSP prefetch functionality to a mixin class 2020-01-22 20:51:09 +02:00
Joona Hoikkala
11fce9a870 Add a crypto_util test and mark few lines as no cover 2019-12-20 20:19:00 +02:00
Joona Hoikkala
857f98d4ec Fix tests 2019-12-19 14:35:36 +02:00
Joona Hoikkala
065e3de422 Add changelog entry 2019-12-19 13:34:35 +02:00
Joona Hoikkala
a9f4498cc0 Merge remote-tracking branch 'origin/master' into ocsp_apache_continued 2019-12-19 13:31:31 +02:00
Joona Hoikkala
f5dc50491c Enchancement, tests, hook to core 2019-12-19 13:27:18 +02:00
Joona Hoikkala
17797b948c Refactoring to latest master 2019-12-02 11:30:12 +02:00
17 changed files with 1481 additions and 29 deletions

View File

@@ -1,20 +1,130 @@
""" Utility functions for certbot-apache plugin """
import binascii
import fnmatch
import hashlib
import logging
import re
import shutil
import struct
import subprocess
import time
from cryptography.hazmat.primitives import hashes # type: ignore
import pkg_resources
from certbot import crypto_util
from certbot import errors
from certbot import util
from certbot.compat import os
# TEMPORARY WORKAROUND
import os as stdlib_os
logger = logging.getLogger(__name__)
def get_apache_ocsp_struct(ttl, ocsp_response):
"""Create Apache OCSP response structure to be used in response cache
:param int ttl: Time-To-Live in seconds
:param str ocsp_response: OCSP response data
:returns: Apache OCSP structure
:rtype: `str`
"""
ttl = time.time() + ttl
# As microseconds
ttl_struct = struct.pack('l', int(ttl*1000000))
return b'\x01'.join([ttl_struct, ocsp_response])
def certid_sha1_hex(cert_path):
"""Hex representation of certificate SHA1 fingerprint
:param str cert_path: File path to certificate
:returns: Hex representation SHA1 fingerprint of certificate
:rtype: `str`
"""
sha1_hex = binascii.hexlify(certid_sha1(cert_path))
return sha1_hex.decode('utf-8')
def certid_sha1(cert_path):
"""SHA1 fingerprint of certificate
:param str cert_path: File path to certificate
:returns: SHA1 fingerprint bytestring
:rtype: `str`
"""
return cert_sha1_fingerprint(cert_path)
def safe_copy(source, target):
"""Copies a file, while verifying the target integrity
with the source. Retries twice if the initial
copy fails.
:param str source: File path of the source file
:param str target: File path of the target file
:raises: .errors.PluginError: If file cannot be
copied or the target file hash does not match
with the source file.
"""
orig_perms = None
try:
orig_perms = stdlib_os.stat(target)
except OSError:
# target file was not found
pass
for _ in range(3):
try:
shutil.copy2(source, target)
if orig_perms:
stdlib_os.chown(target, orig_perms.st_uid)
stdlib_os.chmod(target, oct(orig_perms.st_mode & 0o777))
except IOError as e:
emsg = "Could not copy {} to {}: {}".format(
source, target, e
)
raise errors.PluginError(emsg)
time.sleep(1)
try:
source_hash = _file_hash(source)
target_hash = _file_hash(target)
except IOError:
continue
if source_hash == target_hash:
return
raise errors.PluginError(
"Safe copy failed. The file integrity does not match"
)
def _file_hash(filepath):
"""Helper function for safe_copy that calculates a
sha-256 hash of file.
:param str filepath: Path of file to calculate hash for
:returns: File sha-256 hash
:rtype: str
"""
fhash = hashlib.sha256()
with open(filepath, 'rb') as fh:
fhash.update(fh.read())
return fhash.hexdigest()
def get_mod_deps(mod_name):
"""Get known module dependencies.
@@ -244,6 +354,7 @@ def _get_runtime_cfg(command):
return stdout
def find_ssl_apache_conf(prefix):
"""
Find a TLS Apache config file in the dedicated storage.
@@ -254,3 +365,16 @@ def find_ssl_apache_conf(prefix):
return pkg_resources.resource_filename(
"certbot_apache",
os.path.join("_internal", "tls_configs", "{0}-options-ssl-apache.conf".format(prefix)))
def cert_sha1_fingerprint(cert_path):
"""Read a certificate by its file path and return its SHA-1 fingerprint.
:param str cert_path: File path to the x509 certificate file
:returns: SHA-1 fingerprint of the certificate
:rtype: bytes
"""
cert = crypto_util.load_cert(cert_path)
return cert.fingerprint(hashes.SHA1())

View File

@@ -27,6 +27,7 @@ from acme.magic_typing import Union
from certbot import errors
from certbot import interfaces
from certbot import util
from certbot.achallenges import KeyAuthorizationAnnotatedChallenge # pylint: disable=unused-import
from certbot.compat import filesystem
from certbot.compat import os
@@ -1929,7 +1930,7 @@ class ApacheConfigurator(common.Installer):
self.parser.find_dir("SSLCertificateKeyFile",
lineage.key_path, vhost.path))
def _enable_ocsp_stapling(self, ssl_vhost, unused_options):
def _enable_ocsp_stapling(self, ssl_vhost, unused_options, prefetch=False):
"""Enables OCSP Stapling
In OCSP, each client (e.g. browser) would have to query the
@@ -1950,6 +1951,9 @@ class ApacheConfigurator(common.Installer):
:param unused_options: Not currently used
:type unused_options: Not Available
:param prefetch: Use OCSP prefetching
:type prefetch: bool
:returns: Success, general_vhost (HTTP vhost)
:rtype: (bool, :class:`~certbot_apache._internal.obj.VirtualHost`)
@@ -1960,8 +1964,15 @@ class ApacheConfigurator(common.Installer):
"Unable to set OCSP directives.\n"
"Apache version is below 2.3.3.")
if "socache_shmcb_module" not in self.parser.modules:
self.enable_mod("socache_shmcb")
if prefetch:
if "socache_dbm_module" not in self.parser.modules:
self.enable_mod("socache_dbm")
cache_path = os.path.join(self.config.work_dir, "ocsp", "ocsp_cache.db")
cache_dir = ["dbm:"+cache_path]
else:
if "socache_shmcb_module" not in self.parser.modules:
self.enable_mod("socache_shmcb")
cache_dir = ["shmcb:/var/run/apache2/stapling_cache(128000)"]
# Check if there's an existing SSLUseStapling directive on.
use_stapling_aug_path = self.parser.find_dir("SSLUseStapling",
@@ -1981,9 +1992,7 @@ class ApacheConfigurator(common.Installer):
self.parser.aug.remove(
re.sub(r"/\w*$", "", stapling_cache_aug_path[0]))
self.parser.add_dir_to_ifmodssl(ssl_vhost_aug_path,
"SSLStaplingCache",
["shmcb:/var/run/apache2/stapling_cache(128000)"])
self.parser.add_dir_to_ifmodssl(ssl_vhost_aug_path, "SSLStaplingCache", cache_dir)
msg = "OCSP Stapling was enabled on SSL Vhost: %s.\n"%(
ssl_vhost.filep)

View File

@@ -70,3 +70,6 @@ AUTOHSTS_FREQ = 172800
MANAGED_COMMENT = "DO NOT REMOVE - Managed by Certbot"
MANAGED_COMMENT_ID = MANAGED_COMMENT+", VirtualHost id: {0}"
"""Managed by Certbot comments and the VirtualHost identification template"""
OCSP_INTERNAL_TTL = 86400
"""Internal TTL for OCSP response in seconds: 1 day"""

View File

@@ -8,14 +8,16 @@ from certbot import interfaces
from certbot import util
from certbot.compat import filesystem
from certbot.compat import os
from certbot_apache._internal import apache_util
from certbot_apache._internal import configurator
from certbot_apache._internal import prefetch_ocsp
logger = logging.getLogger(__name__)
@zope.interface.provider(interfaces.IPluginFactory)
class DebianConfigurator(configurator.ApacheConfigurator):
class DebianConfigurator(prefetch_ocsp.OCSPPrefetchMixin, configurator.ApacheConfigurator):
"""Debian specific ApacheConfigurator override class"""
OS_DEFAULTS = dict(

View File

@@ -0,0 +1,461 @@
"""A mixin class for OCSP response prefetching for Apache plugin.
The OCSP prefetching functionality solves multiple issues in Apache httpd
that make using OCSP must-staple error prone.
The prefetching functionality works by storing a value to PluginStorage,
noting certificates that Certbot should keep OCSP staples (OCSP responses)
updated for alongside of the information when the last response was
updated by Certbot.
When Certbot is invoked, typically by scheduled "certbot renew" and the
TTL from "lastupdate" value in PluginStorage entry has expired,
Certbot then proceeds to fetch a new OCSP response from the OCSP servers
pointed by the certificate.
The OCSP response is validated and if valid, stored to Apache DBM
cache. A high internal cache expiry value is set for Apache in order
to make it to not to discard the stored response and try to renew
the staple itself letting Certbot to renew it on its subsequent run
instead.
The DBM cache file used by Apache is a lightweight key-value storage.
For OCSP response caching, the sha1 hash of certificate fingerprint
is used as a key. The value consists of expiry time as timestamp
in microseconds, \x01 delimiter and the raw OCSP response.
When restarting Apache, Certbot backups the current OCSP response
cache, and restores it after the restart has happened. This is
done because Apache deletes and then recreates the file upon
restart.
"""
from datetime import datetime
import logging
import time
from acme.magic_typing import Dict, Union # pylint: disable=unused-import, no-name-in-module
from certbot import errors
from certbot import ocsp
from certbot.plugins.enhancements import OCSPPrefetchEnhancement
from certbot.compat import filesystem
from certbot.compat import os
from certbot_apache._internal import apache_util
from certbot_apache._internal import constants
logger = logging.getLogger(__name__)
class DBMHandler(object):
"""Context manager to handle DBM file reads and writes"""
def __init__(self, filename, mode):
self.filename = filename
self.filemode = mode
self.bsddb = False
self.database = None
def __enter__(self):
"""Open the DBM file and return the filehandle"""
try:
import bsddb
self.bsddb = True
try:
self.database = bsddb.hashopen(self.filename, self.filemode)
except Exception:
raise errors.PluginError("Unable to open dbm database file.")
except ImportError:
# Python3 doesn't have bsddb module, so we use dbm.ndbm instead
import dbm
if self.filename.endswith(".db"):
self.filename = self.filename[:-3]
try:
self.database = dbm.ndbm.open(self.filename, self.filemode) # pylint: disable=no-member
except Exception:
# This is raised if a file cannot be found
raise errors.PluginError("Unable to open dbm database file.")
return self.database
def __exit__(self, *args):
"""Close the DBM file"""
if self.bsddb:
self.database.sync()
self.database.close()
class OCSPPrefetchMixin(object):
"""OCSPPrefetchMixin implements OCSP response prefetching"""
def __init__(self, *args, **kwargs):
self._ocsp_prefetch = {} # type: Dict[str, Dict[str, Union[str, float]]]
# This is required because of python super() call chain.
# Additionally, mypy isn't able to figure the chain out and needs to be
# disabled for this line. See https://github.com/python/mypy/issues/5887
super(OCSPPrefetchMixin, self).__init__(*args, **kwargs) # type: ignore
self._ocsp_store = os.path.join(self.config.work_dir, "ocsp", "ocsp_cache.db")
self._ocsp_work = os.path.join(self.config.work_dir, "ocsp_work", "ocsp_cache.db")
def _ensure_ocsp_dirs(self):
"""Makes sure that the OCSP directory paths exist."""
for path in [os.path.dirname(self._ocsp_work),
os.path.dirname(self._ocsp_store)]:
if not os.path.isdir(path):
filesystem.makedirs(path, 0o755)
def _ensure_ocsp_prefetch_compatibility(self):
"""Make sure that the operating system supports the required libraries
to manage Apache DBM files.
:raises: errors.NotSupportedError
"""
try:
import bsddb # pylint: disable=unused-variable
except ImportError:
import dbm
if not hasattr(dbm, 'ndbm') or dbm.ndbm.library != 'Berkeley DB': # pylint: disable=no-member
msg = ("Unfortunately your operating system does not have a "
"compatible database module available for managing "
"Apache OCSP stapling cache database.")
raise errors.NotSupportedError(msg)
def _ocsp_refresh_needed(self, lastupdate):
"""Refreshes OCSP response for a certificate if it's due
:param int lastupdate: Last update timestamp from pluginstorage entry
:returns: If OCSP response was updated
:rtype: bool
"""
ttl = lastupdate + constants.OCSP_INTERNAL_TTL
if ttl < time.time():
return True
return False
def _ocsp_refresh(self, cert_path, chain_path):
"""Refresh the OCSP response for a certificate
:param str cert_path: Filesystem path to certificate file
:param str chain_path: Filesystem path to certificate chain file
"""
self._ensure_ocsp_dirs()
try:
ocsp_workfile = os.path.join(
os.path.dirname(self._ocsp_work),
apache_util.certid_sha1_hex(cert_path))
handler = ocsp.RevocationChecker()
if not os.path.isfile(cert_path):
raise OCSPCertificateError("Certificate has been removed from the system.")
if not handler.ocsp_revoked_by_paths(cert_path, chain_path, 10, ocsp_workfile):
# Guaranteed good response
cert_sha = apache_util.certid_sha1(cert_path)
# dbm.open automatically adds the file extension
self._write_to_dbm(self._ocsp_store, cert_sha,
self._ocsp_response_dbm(ocsp_workfile))
else:
logger.warning("Encountered an issue while trying to prefetch OCSP "
"response for certificate: %s", cert_path)
raise OCSPCertificateError("Certificate has been revoked.")
finally:
try:
os.remove(ocsp_workfile)
except OSError:
# The OCSP workfile did not exist because of an OCSP response fetching error
pass
def _write_to_dbm(self, filename, key, value):
"""Helper method to write an OCSP response cache value to DBM.
:param filename: DBM database filename
:param bytes key: Database key name
:param bytes value: Database entry value
"""
tmp_file = os.path.join(
os.path.dirname(self._ocsp_work),
"tmp_" + os.path.basename(filename)
)
apache_util.safe_copy(filename, tmp_file)
with DBMHandler(tmp_file, 'w') as db:
db[key] = value
filesystem.replace(tmp_file, filename)
def _ocsp_ttl(self, next_update):
"""Calculates Apache internal TTL for the next OCSP staple
update.
The resulting TTL is half of the time between now
and the time noted by nextUpdate field in OCSP response.
If nextUpdate value is None, a default value will be
used instead.
:param next_update: datetime value for nextUpdate or None
:returns: TTL in seconds.
:rtype: int
"""
# hour in seconds
hour = 3600
suberror = ""
if next_update is not None:
now = datetime.utcnow()
res_ttl = int((next_update - now).total_seconds())
if res_ttl > 0:
safe_ttl = res_ttl - 30 * hour
if safe_ttl > hour:
# Use nextUpdate - 30h if it's over an hour from now
return safe_ttl
else:
suberror = ("OCSP response nextUpdate timestamp too "
"early: {}. Certbot cannot ensure a safe TTL"
"for OCSP staple prefeching.").format(next_update)
else:
suberror = ("OCSP response nextUpdate timestamp too "
"early: {}").format(next_update)
else:
suberror = ("OCSP response nextUpdate not provided with response. "
"Staple should not be prefetched.")
raise errors.PluginError(suberror)
def _ocsp_response_dbm(self, workfile):
"""Creates a dbm entry for OCSP response data
:param str workfile: File path for raw OCSP response
:returns: OCSP response cache data that Apache can use
:rtype: string
"""
handler = ocsp.RevocationChecker()
_, _, next_update = handler.ocsp_times(workfile)
ttl = self._ocsp_ttl(next_update)
with open(workfile, 'rb') as fh:
response = fh.read()
return apache_util.get_apache_ocsp_struct(ttl, response)
def _ocsp_prefetch_save(self, cert_path, chain_path):
"""Saves status of current OCSP prefetch, including the last update
time to determine if an update is needed on later run.
:param str cert_path: Filesystem path to certificate
:param str chain_path: Filesystem path to certificate chain file
"""
status = {
"lastupdate": time.time(),
"chain_path": chain_path
}
self._ocsp_prefetch[cert_path] = status
self.storage.put("ocsp_prefetch", self._ocsp_prefetch)
self.storage.save()
def _ocsp_prefetch_remove(self, cert_path):
"""Removes OCSP prefetch configuration from PluginStorage object for
a certificate.
:param str cert_path: Filesystem path to certificate
"""
self._ocsp_prefetch.pop(cert_path)
self.storage.put("ocsp_prefetch", self._ocsp_prefetch)
def _ocsp_prefetch_fetch_state(self):
"""
Populates the OCSP prefetch state from the pluginstorage.
"""
try:
self._ocsp_prefetch = self.storage.fetch("ocsp_prefetch")
except KeyError:
self._ocsp_prefetch = dict()
def _ocsp_prefetch_backup_db(self):
"""
Copies the active dbm file to work directory. Logs a debug error
message if unable to copy, but does not error out as it would
prevent other critical functions that need to be carried out for
Apache httpd.
Erroring out here would prevent any restarts done by Apache plugin.
"""
self._ensure_ocsp_dirs()
try:
apache_util.safe_copy(
self._ocsp_store,
self._ocsp_work
)
except errors.PluginError:
logger.debug("Encountered an issue while trying to backup OCSP dbm file")
def _ocsp_prefetch_restore_db(self):
"""
Restores the active dbm file from work directory. Logs a debug error
message if unable to restore, but does not error out as it would
prevent other critical functions that need to be carried out for
Apache httpd.
Erroring out here would prevent any restarts done by Apache plugin.
"""
self._ensure_ocsp_dirs()
try:
filesystem.replace(self._ocsp_work, self._ocsp_store)
except IOError:
logger.debug("Encountered an issue when trying to restore OCSP dbm file")
def enable_ocsp_prefetch(self, lineage, domains):
"""Enable OCSP Stapling and prefetching of the responses.
In OCSP, each client (e.g. browser) would have to query the
OCSP Responder to validate that the site certificate was not revoked.
Enabling OCSP Stapling would allow the web-server to query the OCSP
Responder, and staple its response to the offered certificate during
TLS. i.e. clients would not have to query the OCSP responder.
OCSP prefetching functionality addresses some of the pain points in
the implementation that's currently preset in Apache httpd. The
mitigation provided by Certbot are:
* OCSP staples get backed up before, and restored after httpd restart
* Valid OCSP staples do not get overwritten with errors in case of
network connectivity or OCSP responder issues
* The staples get updated asynchronically in the background instead
of blocking a incoming request.
"""
# Fail early if we are not able to support this
self._ensure_ocsp_prefetch_compatibility()
prefetch_vhosts = set()
for domain in domains:
matched_vhosts = self.choose_vhosts(domain, create_if_no_ssl=False)
# We should be handling only SSL vhosts
for vh in matched_vhosts:
if vh.ssl:
prefetch_vhosts.add(vh)
if not prefetch_vhosts:
raise errors.MisconfigurationError(
"Could not find VirtualHost to enable OCSP prefetching on."
)
try:
# The try - block is huge, but required for handling rollback properly.
for vh in prefetch_vhosts:
self._enable_ocsp_stapling(vh, None, prefetch=True)
self._ensure_ocsp_dirs()
self.restart()
# Ensure Apache has enough time to properly restart and create the file
time.sleep(2)
self._ocsp_refresh(lineage.cert_path, lineage.chain_path)
self._ocsp_prefetch_save(lineage.cert_path, lineage.chain_path)
self.save("Enabled OCSP prefetching")
except errors.PluginError as err:
# Revert the OCSP prefetch configuration
self.recovery_routine()
self.restart()
msg = ("Encountered an error while trying to enable OCSP prefetch "
"enhancement: %s\nOCSP prefetch was not enabled.")
raise errors.PluginError(msg % str(err))
def deploy_ocsp_prefetch(self, lineage):
"""When certificate gets renewed, ensure that we're able to serve an appropriate OCSP
staple after the restart that replaces the certificate."""
self._ocsp_prefetch_fetch_state()
if not self._ocsp_prefetch:
# No OCSP prefetching enabled for any certificate
return
if lineage.cert_path in self._ocsp_prefetch:
pf = self._ocsp_prefetch[lineage.cert_path]
try:
self._ocsp_try_refresh(lineage.cert_path)
except OCSPCertificateError:
# This error was logged and handled already down the stack. Return to avoid save.
return
self._ocsp_prefetch_save(lineage.cert_path, pf["chain_path"])
def update_ocsp_prefetch(self, _unused_lineage):
"""Checks all certificates that are managed by OCSP prefetch, and
refreshes OCSP responses for them if required."""
self._ocsp_prefetch_fetch_state()
if not self._ocsp_prefetch:
# No OCSP prefetching enabled for any certificate
return
# make a copy of the list of dictionary keys as we might remove items mid-iteration
for cert_path in list(self._ocsp_prefetch):
pf = self._ocsp_prefetch[cert_path]
if self._ocsp_refresh_needed(pf["lastupdate"]):
try:
self._ocsp_try_refresh(cert_path)
except OCSPCertificateError:
# We want to skip saving in this case, as we just removed the
# certificate from prefetch pool.
continue
self._ocsp_prefetch_save(cert_path, pf["chain_path"])
def _ocsp_try_refresh(self, cert_path):
"""Attempt to refresh OCSP staple for a certificate.
:param str cert_path: Path to certificate
"""
pf = self._ocsp_prefetch[cert_path]
try:
self._ocsp_refresh(cert_path, pf["chain_path"])
except OCSPCertificateError as err:
self._ocsp_prefetch_remove(cert_path)
msg = ("Error when trying to prefetch OCSP staple: {} " +
"OCSP prefetch functionality removed for the certificate").format(err)
logger.warning(msg)
raise
except errors.PluginError as err:
msg = "Encountered a issue when trying to renew OCSP staple: {}".format(err)
logger.warning(msg)
def restart(self):
"""Reloads the Apache server. When restarting, Apache deletes
the DBM cache file used to store OCSP staples. In this override
function, Certbot checks the pluginstorage if we're supposed to
manage OCSP prefetching. If needed, Certbot will backup the DBM
file, restoring it after calling restart.
:raises .errors.MisconfigurationError: If either the config test
or reload fails.
"""
if not self._ocsp_prefetch:
# Try to populate OCSP prefetch structure from pluginstorage
self._ocsp_prefetch_fetch_state()
if self._ocsp_prefetch:
# OCSP prefetching is enabled, so back up the db
self._ocsp_prefetch_backup_db()
try:
# Ignored because mypy doesn't know that this class is used as
# a mixin and fails because object has no restart method.
super(OCSPPrefetchMixin, self).restart() # type: ignore
finally:
if self._ocsp_prefetch:
# Restore the backed up dbm database
self._ocsp_prefetch_restore_db()
class OCSPCertificateError(errors.PluginError):
"""Error that prompts for removal of certificate from OCSP prefetch pool."""
OCSPPrefetchEnhancement.register(OCSPPrefetchMixin) # pylint: disable=no-member

View File

@@ -1,3 +1,3 @@
# Remember to update setup.py to match the package versions below.
acme[dev]==0.29.0
certbot[dev]==1.1.0
-e certbot[dev]

View File

@@ -12,7 +12,8 @@ version = '1.4.0.dev0'
# acme/certbot version.
install_requires = [
'acme>=0.29.0',
'certbot>=1.1.0',
'certbot>=1.4.0.dev0',
'mock',
'python-augeas',
'setuptools',
'zope.component',

View File

@@ -0,0 +1,521 @@
"""Test for certbot_apache._internal.configurator OCSP Prefetching functionality"""
import base64
from datetime import datetime
from datetime import timedelta
import json
import sys
import tempfile
import time
import unittest
import mock
# six is used in mock.patch()
import six # pylint: disable=unused-import
from acme.magic_typing import Dict, List, Set, Union # pylint: disable=unused-import, no-name-in-module
from certbot import errors
from certbot.compat import os
import util
from certbot_apache._internal.prefetch_ocsp import DBMHandler, OCSPCertificateError
class MockDBM(object):
# pylint: disable=missing-docstring
"""Main mock DBM class for Py3 dbm module"""
def __init__(self, library='Berkeley DB'):
self.ndbm = Mockdbm_impl(library)
class Mockdbm_impl(object):
"""Mock dbm implementation that satisfies both bsddb and dbm interfaces"""
# pylint: disable=missing-docstring
def __init__(self, library='Berkeley DB'):
self.library = library
self.name = 'ndbm'
def open(self, path, mode):
return Mockdb(path, mode)
def hashopen(self, path, mode):
return Mockdb(path, mode)
class Mockdb(object):
"""Mock dbm.db for both bsddb and dbm databases"""
# pylint: disable=missing-docstring
def __init__(self, path, mode):
self._data = dict() # type: Dict[str, str]
if mode == "r" or mode == "w":
if not path.endswith(".db"):
path = path+".db"
with open(path, 'r') as fh:
try:
self._data = json.loads(fh.read())
except Exception: # pylint: disable=broad-except
self._data = dict()
self.path = path
self.mode = mode
def __setitem__(self, key, item):
bkey = base64.b64encode(key)
bitem = base64.b64encode(item)
self._data[bkey.decode()] = bitem.decode()
def __getitem__(self, key):
bkey = base64.b64encode(key)
return base64.b64decode(self._data[bkey.decode()])
def keys(self):
return [base64.b64decode(k) for k in self._data.keys()]
def sync(self):
return
def close(self):
with open(self.path, 'w') as fh:
fh.write(json.dumps(self._data))
class OCSPPrefetchTest(util.ApacheTest):
"""Tests for OCSP Prefetch feature"""
# pylint: disable=protected-access
def setUp(self): # pylint: disable=arguments-differ
super(OCSPPrefetchTest, self).setUp()
self.config = util.get_apache_configurator(
self.config_path, self.vhost_path, self.config_dir, self.work_dir,
os_info="debian")
_, self.tmp_certfile = tempfile.mkstemp()
self.lineage = mock.MagicMock(cert_path=self.tmp_certfile, chain_path="chain")
self.config.parser.modules["headers_module"] = None
self.config.parser.modules["mod_headers.c"] = None
self.config.parser.modules["ssl_module"] = None
self.config.parser.modules["mod_ssl.c"] = None
self.config.parser.modules["socache_dbm_module"] = None
self.config.parser.modules["mod_socache_dbm.c"] = None
self.vh_truth = util.get_vh_truth(
self.temp_dir, "debian_apache_2_4/multiple_vhosts")
self.config._ensure_ocsp_dirs()
self.db_path = os.path.join(self.work_dir, "ocsp", "ocsp_cache") + ".db"
def tearDown(self):
os.remove(self.tmp_certfile)
def _call_mocked(self, func, *args, **kwargs):
"""Helper method to call functins with mock stack"""
def mock_restart():
"""Mock ApacheConfigurator.restart that creates the dbm file"""
# Mock the Apache dbm file creation
open(self.db_path, 'a').close()
ver_path = "certbot_apache._internal.configurator.ApacheConfigurator.get_version"
res_path = "certbot_apache._internal.prefetch_ocsp.OCSPPrefetchMixin.restart"
cry_path = "certbot_apache._internal.apache_util.cert_sha1_fingerprint"
with mock.patch(ver_path) as mock_ver:
mock_ver.return_value = (2, 4, 10)
with mock.patch(cry_path) as mock_cry:
mock_cry.return_value = b'j\x056\x1f\xfa\x08B\xe8D\xa1Bn\xeb*A\xebWx\xdd\xfe'
with mock.patch(res_path, side_effect=mock_restart):
return func(*args, **kwargs)
def call_mocked_py2(self, func, *args, **kwargs):
"""Calls methods with imports mocked to suit Py2 environment"""
if 'dbm' in sys.modules.keys(): # pragma: no cover
sys.modules['dbm'] = None
sys.modules['bsddb'] = Mockdbm_impl()
return self._call_mocked(func, *args, **kwargs)
def call_mocked_py3(self, func, *args, **kwargs):
"""Calls methods with imports mocked to suit Py3 environment"""
real_import = six.moves.builtins.__import__
def mock_import(*args, **kwargs):
"""Mock import to raise ImportError for Py2 specific module to make
ApacheConfigurator pick the correct one for Python3 regardless of the
python version the tests are running under."""
if args[0] == "bsddb":
raise ImportError
return real_import(*args, **kwargs)
with mock.patch('six.moves.builtins.__import__', side_effect=mock_import):
sys.modules['dbm'] = MockDBM()
return self._call_mocked(func, *args, **kwargs)
@mock.patch("certbot_apache._internal.override_debian.DebianConfigurator.enable_mod")
def test_ocsp_prefetch_enable_mods(self, mock_enable):
self.config.parser.modules.pop("socache_dbm_module", None)
self.config.parser.modules.pop("mod_socache_dbm.c", None)
self.config.parser.modules.pop("headers_module", None)
self.config.parser.modules.pop("mod_header.c", None)
ref_path = "certbot_apache._internal.override_debian.DebianConfigurator._ocsp_refresh"
with mock.patch(ref_path):
self.call_mocked_py2(self.config.enable_ocsp_prefetch,
self.lineage, ["ocspvhost.com"])
self.assertTrue(mock_enable.called)
self.assertEqual(len(self.config._ocsp_prefetch), 1)
@mock.patch("certbot_apache._internal.override_debian.DebianConfigurator.enable_mod")
def test_ocsp_prefetch_enable_error(self, _mock_enable):
ref_path = "certbot_apache._internal.override_debian.DebianConfigurator._ocsp_refresh"
self.config.recovery_routine = mock.MagicMock()
with mock.patch(ref_path, side_effect=errors.PluginError("failed")):
self.assertRaises(errors.PluginError,
self.call_mocked_py2,
self.config.enable_ocsp_prefetch,
self.lineage,
["ocspvhost.com"])
self.assertTrue(self.config.recovery_routine.called)
def test_ocsp_prefetch_vhost_not_found_error(self):
choose_path = "certbot_apache._internal.override_debian.DebianConfigurator.choose_vhosts"
with mock.patch(choose_path) as mock_choose:
mock_choose.return_value = []
self.assertRaises(errors.MisconfigurationError,
self.call_mocked_py2,
self.config.enable_ocsp_prefetch,
self.lineage,
["domainnotfound"])
def test_deploy_ocsp_prefetch_noop(self):
dumb_lineage = mock.MagicMock(cert_path="not_found")
refresh = "certbot_apache._internal.override_debian.DebianConfigurator._ocsp_try_refresh"
with mock.patch(refresh) as mock_refresh:
self.config.deploy_ocsp_prefetch(dumb_lineage)
self.assertFalse(mock_refresh.called)
@mock.patch("certbot_apache._internal.override_debian.DebianConfigurator._ocsp_try_refresh")
def test_deploy_ocsp_prefetch(self, mock_refresh):
self.config._ocsp_prefetch_save("artificial_path", "irrelevant")
mock_lineage = mock.MagicMock(cert_path="artificial_path")
s_path = "certbot_apache._internal.override_debian.DebianConfigurator._ocsp_prefetch_save"
with mock.patch(s_path) as mock_save:
self.config.deploy_ocsp_prefetch(mock_lineage)
self.assertTrue(mock_refresh.called)
self.assertTrue(mock_save.called)
@mock.patch("certbot_apache._internal.override_debian.DebianConfigurator._ocsp_try_refresh")
def test_deploy_ocsp_prefetch_error(self, mock_refresh):
self.config._ocsp_prefetch_save("artificial_path", "irrelevant")
mock_lineage = mock.MagicMock(cert_path="artificial_path")
mock_refresh.side_effect = OCSPCertificateError("Error")
s_path = "certbot_apache._internal.override_debian.DebianConfigurator._ocsp_prefetch_save"
with mock.patch(s_path) as mock_save:
self.config.deploy_ocsp_prefetch(mock_lineage)
self.assertTrue(mock_refresh.called)
self.assertFalse(mock_save.called)
@mock.patch("certbot_apache._internal.constants.OCSP_INTERNAL_TTL", 0)
def test_ocsp_prefetch_refresh(self):
def ocsp_req_mock(_cert, _chain, workfile):
"""Method to mock the OCSP request and write response to file"""
with open(workfile, 'w') as fh:
fh.write("MOCKRESPONSE")
return False
ocsp_path = "certbot.ocsp.RevocationChecker.ocsp_revoked_by_paths"
with mock.patch(ocsp_path, side_effect=ocsp_req_mock):
with mock.patch('certbot.ocsp.RevocationChecker.ocsp_times') as mock_times:
produced_at = datetime.today() - timedelta(days=1)
this_update = datetime.today() - timedelta(days=2)
next_update = datetime.today() + timedelta(days=2)
mock_times.return_value = produced_at, this_update, next_update
self.call_mocked_py2(self.config.enable_ocsp_prefetch,
self.lineage, ["ocspvhost.com"])
odbm = _read_dbm(self.db_path)
self.assertEqual(len(odbm.keys()), 1)
# The actual response data is prepended by Apache timestamp
self.assertTrue(odbm[list(odbm.keys())[0]].endswith(b'MOCKRESPONSE'))
with mock.patch(ocsp_path, side_effect=ocsp_req_mock) as mock_ocsp:
with mock.patch('certbot.ocsp.RevocationChecker.ocsp_times') as mock_times:
produced_at = datetime.today() - timedelta(days=1)
this_update = datetime.today() - timedelta(days=2)
next_update = datetime.today() + timedelta(days=2)
mock_times.return_value = produced_at, this_update, next_update
self.call_mocked_py2(self.config.update_ocsp_prefetch, None)
self.assertTrue(mock_ocsp.called)
def test_ocsp_prefetch_refresh_cert_deleted(self):
self.config._ocsp_prefetch_save("nonexistent_cert_path", "irrelevant")
self.assertEqual(len(self.config._ocsp_prefetch), 1)
rn_path = "certbot_apache._internal.override_debian.DebianConfigurator._ocsp_refresh_needed"
with mock.patch(rn_path) as mock_needed:
mock_needed.return_value = True
with mock.patch("certbot_apache._internal.prefetch_ocsp.logger.warning") as mock_log:
self.call_mocked_py2(self.config.update_ocsp_prefetch, None)
self.assertTrue(mock_log.called)
self.assertTrue("has been removed" in mock_log.call_args[0][0])
self.assertEqual(len(self.config._ocsp_prefetch), 0)
def test_ocsp_prefetch_refresh_noop(self):
self.config._ocsp_prefetch_save(self.lineage.cert_path, "irrelevant")
refresh_path = "certbot_apache._internal.override_debian.DebianConfigurator._ocsp_refresh"
with mock.patch(refresh_path) as mock_refresh:
self.call_mocked_py2(self.config.update_ocsp_prefetch, None)
self.assertFalse(mock_refresh.called)
def test_ocsp_prefetch_refresh_error(self):
self.config._ocsp_prefetch_save(self.lineage.cert_path, "irrelevant")
refresh_path = "certbot_apache._internal.override_debian.DebianConfigurator._ocsp_refresh"
rn_path = "certbot_apache._internal.override_debian.DebianConfigurator._ocsp_refresh_needed"
with mock.patch(refresh_path) as mock_refresh:
mock_refresh.side_effect = errors.PluginError("Could not update")
with mock.patch("certbot_apache._internal.prefetch_ocsp.logger.warning") as mock_log:
with mock.patch(rn_path) as mock_needed:
mock_needed.return_value = True
self.call_mocked_py2(self.config.update_ocsp_prefetch, None)
self.assertTrue(mock_log.called)
@mock.patch("certbot_apache._internal.configurator.ApacheConfigurator.config_test")
def test_ocsp_prefetch_backup_db(self, _mock_test):
def ocsp_del_db():
"""Side effect of _reload() that deletes the DBM file, like Apache
does when restarting"""
os.remove(self.db_path)
self.assertFalse(os.path.isfile(self.db_path))
# Make sure that the db file exists
open(self.db_path, 'a').close()
self.call_mocked_py2(self.config._write_to_dbm, self.db_path, b'mock_key', b'mock_value')
# Mock OCSP prefetch dict to signify that there should be a db
self.config._ocsp_prefetch = {"mock": "value"}
rel_path = "certbot_apache._internal.configurator.ApacheConfigurator._reload"
with mock.patch(rel_path, side_effect=ocsp_del_db):
self.config.restart()
odbm = _read_dbm(self.db_path)
self.assertEqual(odbm[b'mock_key'], b'mock_value')
@mock.patch("certbot_apache._internal.configurator.ApacheConfigurator.config_test")
@mock.patch("certbot_apache._internal.configurator.ApacheConfigurator._reload")
def test_ocsp_prefetch_backup_db_error(self, _mock_reload, _mock_test):
log_path = "certbot_apache._internal.prefetch_ocsp.logger.debug"
log_string = "Encountered an issue while trying to backup OCSP dbm file"
log_string2 = "Encountered an issue when trying to restore OCSP dbm file"
self.config._ocsp_prefetch = {"mock": "value"}
with mock.patch("certbot.compat.filesystem.replace", side_effect=IOError):
with mock.patch(log_path) as mock_log:
self.config.restart()
self.assertTrue(mock_log.called)
self.assertEqual(mock_log.call_count, 2)
self.assertTrue(log_string in mock_log.call_args_list[0][0][0])
self.assertTrue(log_string2 in mock_log.call_args_list[1][0][0])
@mock.patch("certbot_apache._internal.prefetch_ocsp.OCSPPrefetchMixin.restart")
def test_ocsp_prefetch_refresh_fail(self, _mock_restart):
ocsp_path = "certbot.ocsp.RevocationChecker.ocsp_revoked_by_paths"
log_path = "certbot_apache._internal.prefetch_ocsp.logger.warning"
with mock.patch(ocsp_path) as mock_ocsp:
mock_ocsp.return_value = True
with mock.patch(log_path) as mock_log:
self.assertRaises(errors.PluginError,
self.call_mocked_py2,
self.config.enable_ocsp_prefetch,
self.lineage, ["ocspvhost.com"]
)
self.assertTrue(mock_log.called)
self.assertTrue(
"trying to prefetch OCSP" in mock_log.call_args[0][0])
@mock.patch("certbot_apache._internal.override_debian.DebianConfigurator._ocsp_refresh_needed")
def test_ocsp_prefetch_update_noop(self, mock_refresh):
self.config.update_ocsp_prefetch(None)
self.assertFalse(mock_refresh.called)
def test_ocsp_prefetch_preflight_check_noerror(self):
self.call_mocked_py2(self.config._ensure_ocsp_prefetch_compatibility)
self.call_mocked_py3(self.config._ensure_ocsp_prefetch_compatibility)
real_import = six.moves.builtins.__import__
def mock_import(*args, **kwargs):
if args[0] == "bsddb":
raise ImportError
return real_import(*args, **kwargs)
with mock.patch('six.moves.builtins.__import__', side_effect=mock_import):
sys.modules['dbm'] = MockDBM(library='Not Berkeley DB')
self.assertRaises(errors.NotSupportedError,
self._call_mocked,
self.config._ensure_ocsp_prefetch_compatibility)
def test_ocsp_prefetch_open_dbm_no_file(self):
open(self.db_path, 'a').close()
db_not_exists = self.db_path+"nonsense"
self.call_mocked_py2(self.config._write_to_dbm, self.db_path, b'k', b'v')
self.assertRaises(errors.PluginError,
self.call_mocked_py2,
self.config._write_to_dbm,
db_not_exists,
b'k', b'v')
def test_ocsp_prefetch_py2_open_file_error(self):
open(self.db_path, 'a').close()
mock_db = mock.MagicMock()
mock_db.hashopen.side_effect = Exception("error")
sys.modules["bsddb"] = mock_db
self.assertRaises(errors.PluginError,
self.config._write_to_dbm,
self.db_path,
b'k', b'v')
def test_ocsp_prefetch_py3_open_file_error(self):
open(self.db_path, 'a').close()
mock_db = mock.MagicMock()
mock_db.ndbm.open.side_effect = Exception("error")
sys.modules["dbm"] = mock_db
sys.modules["bsddb"] = None
self.assertRaises(errors.PluginError,
self.config._write_to_dbm,
self.db_path,
b'k', b'v')
def test_ocsp_prefetch_open_close_py2_noerror(self):
expected_val = b'whatever_value'
open(self.db_path, 'a').close()
self.call_mocked_py2(
self.config._write_to_dbm, self.db_path,
b'key', expected_val
)
db2 = self.call_mocked_py2(_read_dbm, self.db_path)
self.assertEqual(db2[b'key'], expected_val)
def test_ocsp_prefetch_open_close_py3_noerror(self):
expected_val = b'whatever_value'
open(self.db_path, 'a').close()
self.call_mocked_py3(
self.config._write_to_dbm, self.db_path,
b'key', expected_val
)
db2 = self.call_mocked_py3(_read_dbm, self.db_path)
self.assertEqual(db2[b'key'], expected_val)
def test_ocsp_prefetch_safe_open_hash_mismatch(self):
import random
def mock_hash(_filepath):
# shound not be the same value twice
return str(random.getrandbits(1024))
open(self.db_path, 'a').close()
with mock.patch("certbot_apache._internal.apache_util._file_hash", side_effect=mock_hash):
self.assertRaises(
errors.PluginError,
self.call_mocked_py3,
self.config._write_to_dbm, self.db_path,
b'anything', b'irrelevant'
)
def test_ocsp_prefetch_safe_open_hash_fail_open(self):
open(self.db_path, 'a').close()
with mock.patch("certbot_apache._internal.apache_util._file_hash", side_effect=IOError):
self.assertRaises(
errors.PluginError,
self.call_mocked_py3,
self.config._write_to_dbm, self.db_path,
b'anything', b'irrelevant'
)
def test_ttl_no_nextupdate(self):
self.assertRaises(errors.PluginError,
self.config._ocsp_ttl,
None)
def test_ttl_nextupdate_in_past(self):
next_update = datetime.utcnow() - timedelta(hours=1)
self.assertRaises(errors.PluginError,
self.config._ocsp_ttl,
next_update)
def test_ttl_nextupdate_not_enough_leeway(self):
next_update = datetime.utcnow() + timedelta(hours=29)
self.assertRaises(errors.PluginError,
self.config._ocsp_ttl,
next_update)
def test_ttl_ok(self):
next_update = datetime.utcnow() + timedelta(hours=32)
ttl = self.config._ocsp_ttl(next_update)
self.assertTrue(ttl > 7100 and ttl < 7201)
def test_ttl(self):
next_update = datetime.utcnow() + timedelta(days=6)
ttl = self.config._ocsp_ttl(next_update)
# ttl should be 30h from next_update
self.assertTrue(ttl < timedelta(days=4, hours=19).total_seconds())
self.assertTrue(ttl > timedelta(days=4, hours=17).total_seconds())
@mock.patch("certbot_apache._internal.prefetch_ocsp.OCSPPrefetchMixin._ocsp_prefetch_fetch_state")
@mock.patch("certbot_apache._internal.configurator.ApacheConfigurator.config_test")
@mock.patch("certbot_apache._internal.configurator.ApacheConfigurator._reload")
def test_restart_load_state_call(self, _rl, _ct, mock_fch):
self.assertFalse(self.config._ocsp_prefetch)
self.config.restart()
self.assertTrue(mock_fch.called)
@mock.patch("certbot_apache._internal.prefetch_ocsp.OCSPPrefetchMixin._ocsp_prefetch_backup_db")
@mock.patch("certbot_apache._internal.prefetch_ocsp.OCSPPrefetchMixin._ocsp_prefetch_restore_db")
@mock.patch("certbot_apache._internal.configurator.ApacheConfigurator.config_test")
@mock.patch("certbot_apache._internal.configurator.ApacheConfigurator._reload")
def test_restart_backupdb_call(self, _rl, _ctest, mock_rest, mock_bck):
self.config._ocsp_prefetch = True
self.config.restart()
self.assertTrue(mock_rest.called)
self.assertTrue(mock_bck.called)
@mock.patch("certbot_apache._internal.prefetch_ocsp.OCSPPrefetchMixin._ocsp_prefetch_backup_db")
@mock.patch("certbot_apache._internal.prefetch_ocsp.OCSPPrefetchMixin._ocsp_prefetch_restore_db")
@mock.patch("certbot_apache._internal.configurator.ApacheConfigurator.config_test")
@mock.patch("certbot_apache._internal.configurator.ApacheConfigurator._reload")
def test_restart_recover_error(self, mock_reload, _ctest, mock_rest, mock_bck):
self.config._ocsp_prefetch = True
mock_reload.side_effect = errors.MisconfigurationError
self.assertRaises(errors.MisconfigurationError, self.config.restart)
self.assertTrue(mock_bck.called)
self.assertTrue(mock_rest.called)
class CertFingerprintTest(unittest.TestCase):
"""Tests for certbot_apache._internal.apache_util.cert_sha1_fingerprint"""
def test_cert_sha1_fingerprint(self):
import certbot.tests.util as test_util
from certbot_apache._internal.apache_util import cert_sha1_fingerprint
cert_path = test_util.vector_path('cert_512.pem')
self.assertEqual(
cert_sha1_fingerprint(cert_path),
b'\t\xf8\xce\x01E\r(\x84g\xc32j\xc0E~5\x199\xc7.'
)
def _read_dbm(filename):
"""Helper method for reading the dbm using context manager.
Used for tests.
:param str filename: DBM database filename
:returns: Dictionary of database keys and values
:rtype: dict
"""
ret = dict()
with DBMHandler(filename, 'r') as db:
for k in db.keys():
ret[k] = db[k]
return ret
if __name__ == "__main__":
unittest.main() # pragma: no cover

View File

@@ -6,6 +6,9 @@ Certbot adheres to [Semantic Versioning](https://semver.org/).
### Added
* OCSP prefetching functionality for Apache plugin that attempts to refresh the OCSP
response cache for managed certificates when scheduled Certbot renew is being run.
* Added certbot.ocsp Certbot's API. The certbot.ocsp module can be used to
* Turn off session tickets for apache plugin by default when appropriate.
* Added serial number of certificate to the output of `certbot certificates`
* Expose two new environment variables in the authenticator and cleanup scripts used by

View File

@@ -60,6 +60,7 @@ CLI_DEFAULTS = dict(
redirect=None,
auto_hsts=False,
hsts=None,
ocsp_prefetch=False,
uir=None,
staple=None,
strict_permissions=False,

View File

@@ -805,6 +805,11 @@ def install(config, plugins):
"If your certificate is managed by Certbot, please use --cert-name "
"to define which certificate you would like to install.")
# It's important that the old style enhancements get enabled before
# the new style ones, as some of the new enhancements can modify the
# same configuration directives. _install_cert() in the above block
# handles the old style enhancements here.
if enhancements.are_requested(config):
# In the case where we don't have certname, we have errored out already
lineage = cert_manager.lineage_for_certname(config, config.certname)
@@ -925,9 +930,14 @@ def enhance(config, plugins):
lineage = cert_manager.lineage_for_certname(config, config.certname)
if not config.chain_path:
config.chain_path = lineage.chain_path
if not config.cert_path:
config.cert_path = lineage.cert_path
if oldstyle_enh:
le_client = _init_le_client(config, authenticator=None, installer=installer)
le_client.enhance_config(domains, config.chain_path, redirect_default=False)
# It's important that the old style enhancements get enabled before
# the new style ones, as some of the new enhancements can modify the
# same configuration directives.
if enhancements.are_requested(config):
enhancements.enable(lineage, domains, installer, config)
@@ -1106,7 +1116,10 @@ def run(config, plugins):
_report_new_cert(config, cert_path, fullchain_path, key_path)
_install_cert(config, le_client, domains, new_lineage)
# It's important that the old style enhancements get enabled before
# the new style ones, as some of the new enhancements can modify the
# same configuration directives. _install_cert() handles the old
# style enhancements here.
if enhancements.are_requested(config) and new_lineage:
enhancements.enable(new_lineage, domains, installer, config)

View File

@@ -223,6 +223,19 @@ def verify_renewable_cert(renewable_cert):
verify_cert_matches_priv_key(renewable_cert.cert_path, renewable_cert.key_path)
def load_cert(cert_path):
"""Reads the certificate PEM file and returns a cryptography.x509.Certificate object.
:param str cert_path: Path to the certificate
:rtype: `cryptography.x509.Certificate`
:returns: x509 certificate object
"""
with open(cert_path, 'rb') as fh:
cert_pem = fh.read()
return x509.load_pem_x509_certificate(cert_pem, default_backend())
def verify_renewable_cert_sig(renewable_cert):
"""Verifies the signature of a RenewableCert object.
@@ -295,7 +308,7 @@ def verify_cert_matches_priv_key(cert_path, key_path):
error_str = "verifying the cert located at {0} matches the \
private key located at {1} has failed. \
Details: {2}".format(cert_path,
key_path, e)
key_path, e)
logger.exception(error_str)
raise errors.Error(error_str)
@@ -449,7 +462,7 @@ def _notAfterBefore(cert_path, method):
# pylint: disable=redefined-outer-name
with open(cert_path) as f:
x509 = crypto.load_certificate(crypto.FILETYPE_PEM,
f.read())
f.read())
# pyopenssl always returns bytes
timestamp = method(x509)
reformatted_timestamp = [timestamp[0:4], b"-", timestamp[4:6], b"-",

View File

@@ -65,26 +65,26 @@ class RevocationChecker(object):
.. todo:: Make this a non-blocking call
:param `.interfaces.RenewableCert` cert: Certificate object
:returns: True if revoked; False if valid or the check failed or cert is expired.
:rtype: bool
"""
return self.ocsp_revoked_by_paths(cert.cert_path, cert.chain_path)
def ocsp_revoked_by_paths(self, cert_path, chain_path, timeout=10):
# type: (str, str, int) -> bool
def ocsp_revoked_by_paths(self, cert_path, chain_path, timeout=10, response_file=None):
# type: (str, str, int, Optional[str]) -> bool
"""Performs the OCSP revocation check
:param str cert_path: Certificate filepath
:param str cert_path: Certificate path
:param str chain_path: Certificate chain
:param int timeout: Timeout (in seconds) for the OCSP query
:param str response_file: File path where the raw OCSP response should be written
:returns: True if revoked; False if valid or the check failed or cert is expired.
:rtype: bool
"""
if self.broken:
return False
# Let's Encrypt doesn't update OCSP for expired certificates,
# so don't check OCSP if the cert is expired.
@@ -93,16 +93,37 @@ class RevocationChecker(object):
if crypto_util.notAfter(cert_path) <= now:
return False
if self.broken:
return False
url, host = _determine_ocsp_server(cert_path)
if not host or not url:
return False
if self.use_openssl_binary:
return self._check_ocsp_openssl_bin(cert_path, chain_path, host, url,
timeout, response_file)
return _check_ocsp_cryptography(cert_path, chain_path, url, timeout, response_file)
def ocsp_times(self, response_file):
# type: (str) -> Tuple[Optional[datetime], Optional[datetime], Optional[datetime]]
"""
Reads OCSP response file and returns producedAt, thisUpdate
and nextUpdate values in datetime format, or None if an error
occurs.
:param str response_file: File path to OCSP response
:returns: tuple of producedAt, thisUpdate and nextUpdate values
:rtype: tuple of datetime or None
"""
if self.use_openssl_binary:
return self._check_ocsp_openssl_bin(cert_path, chain_path, host, url, timeout)
return _check_ocsp_cryptography(cert_path, chain_path, url, timeout)
return _ocsp_times_openssl_bin(response_file)
return _ocsp_times_cryptography(response_file)
def _check_ocsp_openssl_bin(self, cert_path, chain_path, host, url, timeout):
# type: (str, str, str, str, int) -> bool
def _check_ocsp_openssl_bin(self, cert_path, chain_path, host, url, timeout, response_file=None): # pylint: disable=line-too-long
# type: (str, str, str, str, int, Optional[str]) -> bool
# Minimal implementation of proxy selection logic as seen in, e.g., cURL
# Some things that won't work, but may well be in use somewhere:
# - username and password for proxy authentication
@@ -124,11 +145,16 @@ class RevocationChecker(object):
"-no_nonce",
"-issuer", chain_path,
"-cert", cert_path,
"-url", url,
"-CAfile", chain_path,
"-verify_other", chain_path,
"-trust_other",
"-timeout", str(timeout),
"-header"] + self.host_args(host) + url_opts
if response_file: # pragma: no cover
cmd += ["-respout", response_file]
logger.debug("Querying OCSP for %s", cert_path)
logger.debug(" ".join(cmd))
try:
@@ -170,8 +196,58 @@ def _determine_ocsp_server(cert_path):
return None, None
def _check_ocsp_cryptography(cert_path, chain_path, url, timeout):
# type: (str, str, str, int) -> bool
def _ocsp_times_openssl_bin(response_file):
# type: (str) -> Tuple[Optional[datetime], Optional[datetime], Optional[datetime]]
"""
Reads OCSP response file using OpenSSL binary and returns
producedAt, thisUpdate and nextUpdate values in datetime format.
:param str response_file: File path to OCSP response
:returns: tuple of producedAt, thisUpdate and nextUpdate values
:rtype: tuple of datetime or None
"""
cmd = ["openssl", "ocsp", "-resp_text", "-noverify", "-respin", response_file]
logger.debug("Reading OCSP response from file: %s", response_file)
logger.debug(" ".join(cmd))
try:
output, _ = util.run_script(cmd, log=logger.debug)
except errors.SubprocessError:
logger.info("Reading OCSP response from file failed.")
return None, None, None
prod_str, this_str, next_str = _translate_ocsp_response_times(output)
prod_dt = _parse_datetime(prod_str)
this_dt = _parse_datetime(this_str)
next_dt = _parse_datetime(next_str)
return prod_dt, this_dt, next_dt
def _ocsp_times_cryptography(response_file):
# type: (str) -> Tuple[Optional[datetime], Optional[datetime], Optional[datetime]]
"""
Reads OCSP response using cryptography and returns producedAt,
thisUpdate and nextUpdate values in datetime format, or None
if the file cannot be opened.
:param str response_file: File path to OCSP response
:returns: tuple of producedAt, thisUpdate and nextUpdate values
:rtype: tuple of datetime or None
"""
try:
with open(response_file, 'rb') as fh:
raw_response = fh.read()
except OSError:
return None, None, None
response = ocsp.load_der_ocsp_response(raw_response)
return response.produced_at, response.this_update, response.next_update
def _check_ocsp_cryptography(cert_path, chain_path, url, timeout, response_file=None):
# type: (str, str, str, int, Optional[str]) -> bool
# Retrieve OCSP response
with open(chain_path, 'rb') as file_handler:
issuer = x509.load_pem_x509_certificate(file_handler.read(), default_backend())
@@ -192,6 +268,10 @@ def _check_ocsp_cryptography(cert_path, chain_path, url, timeout):
logger.info("OCSP check failed for %s (HTTP status: %d)", cert_path, response.status_code)
return False
if response_file: # pragma: no cover
with open(response_file, 'wb') as fh:
fh.write(response.content)
response_ocsp = ocsp.load_der_ocsp_response(response.content)
# Check OCSP response validity
@@ -327,3 +407,54 @@ def _translate_ocsp_query(cert_path, ocsp_output, ocsp_errors):
logger.warning("Unable to properly parse OCSP output: %s\nstderr:%s",
ocsp_output, ocsp_errors)
return False
def _translate_ocsp_response_times(response):
# type: (str) -> Tuple[str, str, str]
"""
Parse openssl OCSP response output and return producedAt,
thisUpdate and nextUpdate values.
:param str response: OpenSSL OCSP response output
:returns: tuple of producedAt, thisUpdate and nextUpdate values
:rtype: tuple of str
"""
prod_pattern = "Produced At: (.+)$"
this_pattern = "This Update: (.+)$"
next_pattern = "Next Update: (.+)$"
prod_date = ""
this_date = ""
next_date = ""
prod_match = re.search(prod_pattern, response, flags=re.MULTILINE)
if prod_match:
prod_date = prod_match.group(1)
this_match = re.search(this_pattern, response, flags=re.MULTILINE)
if this_match:
this_date = this_match.group(1)
next_match = re.search(next_pattern, response, flags=re.MULTILINE)
if next_match:
next_date = next_match.group(1)
return prod_date, this_date, next_date
def _parse_datetime(dt_string):
"""
Parses a string to datetime, ignoring timezone.
:param str dt_string: String representation of date and time
:returns: datetime representation of time
:rtype: datetime.datetime or None
"""
try:
dateformat = "%b %d %H:%M:%S %Y %Z"
return datetime.strptime(dt_string, dateformat)
except ValueError:
return None

View File

@@ -156,13 +156,62 @@ class AutoHSTSEnhancement(object):
:type domains: `list` of `str`
"""
@six.add_metaclass(abc.ABCMeta)
class OCSPPrefetchEnhancement(object):
"""
Enhancement interface that installer plugins can implement in order to
provide functionality that prefetches an OCSP response and stores it
to be served for incoming client requests.
The plugins implementing new style enhancements are responsible of handling
the saving of configuration checkpoints as well as calling possible restarts
of managed software themselves. For update_ocsp_prefetch method, the installer
may have to call prepare() to finalize the plugin initialization.
Methods:
enable_ocsp_prefetch is called when the domain is configured to
serve OCSP responses using mechanism called OCSP Stapling.
update_ocsp_prefetch is called every time when Certbot is run using 'renew'
verb. Certbot should proceed to make a request to the OCSP server in order
to fetch an OCSP response and to store the recieved response, if valid.
"""
@abc.abstractmethod
def update_ocsp_prefetch(self, lineage, *args, **kwargs):
"""
Gets called for each lineage every time Certbot is run with 'renew' verb.
Implementation of this method should fetch a fresh OCSP response if it's
needed and if valid, store it to be served for connecting clients.
:param lineage: Certificate lineage object
:type lineage: certbot.interfaces.RenewableCert
.. note:: prepare() method inherited from `interfaces.IPlugin` might need
to be called manually within implementation of this interface method
to finalize the plugin initialization.
"""
@abc.abstractmethod
def enable_ocsp_prefetch(self, lineage, domains, *args, **kwargs):
"""
Enables the OCSP enhancement, enabling OCSP Stapling functionality for
the controlled software, and sets it up for prefetching the responses
over the subsequent runs of Certbot renew.
:param lineage: Certificate lineage object
:type lineage: certbot.interfaces.RenewableCert
:param domains: List of domains in certificate to enhance
:type domains: `list` of `str`
"""
# This is used to configure internal new style enhancements in Certbot. These
# enhancement interfaces need to be defined in this file. Please do not modify
# this list from plugin code.
_INDEX = [
{
"name": "AutoHSTS",
"cli_help": "Gradually increasing max-age value for HTTP Strict Transport "+
"cli_help": "Gradually increasing max-age value for HTTP Strict Transport " +
"Security security header",
"cli_flag": "--auto-hsts",
"cli_flag_default": constants.CLI_DEFAULTS["auto_hsts"],
@@ -173,5 +222,19 @@ _INDEX = [
"updater_function": "update_autohsts",
"deployer_function": "deploy_autohsts",
"enable_function": "enable_autohsts"
},
{
"name": "OCSPPrefetch",
"cli_help": "Prefetch OCSP responses for certificates in order to be " +
"able to serve connecting clients fresh staple immediately",
"cli_flag": "--prefetch-ocsp",
"cli_flag_default": constants.CLI_DEFAULTS["ocsp_prefetch"],
"cli_groups": ["security", "enhance"],
"cli_dest": "ocsp_prefetch",
"cli_action": "store_true",
"class": OCSPPrefetchEnhancement,
"updater_function": "update_ocsp_prefetch",
"deployer_function": "deploy_ocsp_prefetch",
"enable_function": "enable_ocsp_prefetch"
}
] # type: List[Dict[str, Any]]

View File

@@ -935,6 +935,30 @@ changed by passing the desired number to the command line flag
want to alter the log rotation, check `/etc/logrotate.d/` for a
certbot rotation script.
.. _prefetch-ocsp:
Prefetching OCSP responses
==========================
Certbot users on Debian and Ubuntu based operating systems have the option to
configure certbot to handle prefetching and management of OCSP staples in behalf
of the Apache process. This mitigates multiple issues that exist with Apache OCSP
staple handling in cases where there are issues with either network connectivity
or OCSP service availability.
Normally when configuring Apache to handle OCSP stapling, it proceeds to fetch
the initial response from the OCSP server only during the handshake of next
incoming request after the restart. Upon requesting a new OCSP response from the
OCSP server pointed by the certificate, Apache overwrites the already existing
cached response regardless of the validity of the received response.
Certbot tries to fix these issues by configuring the internal expiry of the
Apache OCSP staple cache close to the expiry of the actual OCSP staple as well
as by backing up and restoring the existing OCSP staple cache file when restarting
Apache process.
The OCSP prefetching can be enabled with command line flag `--prefetch-ocsp`.
.. _command-line:
Certbot command-line options

View File

@@ -28,6 +28,7 @@ P256_KEY = test_util.load_vector('nistp256_key.pem')
P256_CERT_PATH = test_util.vector_path('cert-nosans_nistp256.pem')
P256_CERT = test_util.load_vector('cert-nosans_nistp256.pem')
class InitSaveKeyTest(test_util.TempDirTestCase):
"""Tests for certbot.crypto_util.init_save_key."""
def setUp(self):
@@ -372,8 +373,10 @@ class Sha256sumTest(unittest.TestCase):
"""Tests for certbot.crypto_util.notAfter"""
def test_sha256sum(self):
from certbot.crypto_util import sha256sum
self.assertEqual(sha256sum(CERT_PATH),
'914ffed8daf9e2c99d90ac95c77d54f32cbd556672facac380f0c063498df84e')
self.assertEqual(
sha256sum(CERT_PATH),
'914ffed8daf9e2c99d90ac95c77d54f32cbd556672facac380f0c063498df84e'
)
class CertAndChainFromFullchainTest(unittest.TestCase):

View File

@@ -140,6 +140,40 @@ class OCSPTestOpenSSL(unittest.TestCase):
self.assertEqual(ocsp._translate_ocsp_query(*openssl_expired_ocsp_revoked), True)
self.assertEqual(mock_log.info.call_count, 1)
@mock.patch('certbot.util.run_script')
def test_ocsp_response_get_times(self, mock_run):
mock_run.return_value = ocsp_times_example
producedAt, thisUpdate, nextUpdate = self.checker.ocsp_times("mocked")
self.assertEqual(producedAt, datetime(2020, 1, 24, 11, 10))
self.assertEqual(thisUpdate, datetime(2020, 1, 24, 11, 0))
self.assertEqual(nextUpdate, datetime(2020, 1, 31, 11, 0))
@mock.patch('certbot.util.run_script')
def test_ocsp_response_get_times_no_nextupdate(self, mock_run):
mock_run.return_value = ocsp_times_example_nonext
producedAt, thisUpdate, nextUpdate = self.checker.ocsp_times("mocked")
self.assertEqual(producedAt, datetime(2020, 1, 24, 11, 10))
self.assertEqual(thisUpdate, datetime(2020, 1, 24, 11, 0))
self.assertEqual(nextUpdate, None)
@mock.patch('certbot.util.run_script')
def test_ocsp_response_get_times_badoutput(self, mock_run):
mock_run.return_value = ("Something unparsable", "")
producedAt, thisUpdate, nextUpdate = self.checker.ocsp_times("mocked")
self.assertEqual(producedAt, None)
self.assertEqual(thisUpdate, None)
self.assertEqual(nextUpdate, None)
@mock.patch('certbot.ocsp.logger')
@mock.patch('certbot.util.run_script')
def test_ocsp_response_get_times_error(self, mock_run, mock_log):
mock_run.side_effect = errors.SubprocessError
producedAt, thisUpdate, nextUpdate = self.checker.ocsp_times("mocked")
self.assertEqual(producedAt, None)
self.assertEqual(thisUpdate, None)
self.assertEqual(nextUpdate, None)
self.assertEqual(mock_log.info.call_count, 1)
@unittest.skipIf(not ocsp_lib,
reason='This class tests functionalities available only on cryptography>=2.5.0')
@@ -165,11 +199,12 @@ class OSCPTestCryptography(unittest.TestCase):
@mock.patch('certbot.ocsp._determine_ocsp_server')
@mock.patch('certbot.ocsp._check_ocsp_cryptography')
def test_ensure_cryptography_toggled(self, mock_check, mock_determine):
def test_ensure_cryptography_toggled(self, mock_revoke, mock_determine):
mock_determine.return_value = ('http://example.com', 'example.com')
self.checker.ocsp_revoked(self.cert_obj)
mock_check.assert_called_once_with(self.cert_path, self.chain_path, 'http://example.com', 10)
mock_revoke.assert_called_once_with(self.cert_path, self.chain_path,
'http://example.com', 10, None)
def test_revoke(self):
with _ocsp_mock(ocsp_lib.OCSPCertStatus.REVOKED, ocsp_lib.OCSPResponseStatus.SUCCESSFUL):
@@ -279,6 +314,40 @@ class OSCPTestCryptography(unittest.TestCase):
revoked = self.checker.ocsp_revoked(self.cert_obj)
self.assertFalse(revoked)
def test_ocsp_times_cryptography(self):
with mock.patch('certbot.ocsp.open', mock.mock_open(read_data="")):
with mock.patch('cryptography.x509.ocsp.load_der_ocsp_response') as mock_load:
resp = mock.MagicMock()
resp.produced_at = datetime(2020, 1, 2, 9, 9)
resp.this_update = datetime(2020, 1, 2, 8, 8)
resp.next_update = datetime(2020, 1, 3, 4, 4)
mock_load.return_value = resp
produced_at, this_update, next_update = self.checker.ocsp_times("mocked")
self.assertEqual(produced_at, datetime(2020, 1, 2, 9, 9))
self.assertEqual(this_update, datetime(2020, 1, 2, 8, 8))
self.assertEqual(next_update, datetime(2020, 1, 3, 4, 4))
def test_ocsp_times_cryptography_no_nextupdate(self):
with mock.patch('certbot.ocsp.open', mock.mock_open(read_data="")):
with mock.patch('cryptography.x509.ocsp.load_der_ocsp_response') as mock_load:
resp = mock.MagicMock()
resp.produced_at = datetime(2020, 1, 2, 9, 9)
resp.this_update = datetime(2020, 1, 2, 8, 8)
resp.next_update = None
mock_load.return_value = resp
produced_at, this_update, next_update = self.checker.ocsp_times("mocked")
self.assertEqual(produced_at, datetime(2020, 1, 2, 9, 9))
self.assertEqual(this_update, datetime(2020, 1, 2, 8, 8))
self.assertEqual(next_update, None)
def test_ocsp_times_cryptography_error(self):
with mock.patch('certbot.ocsp.open', mock.mock_open(read_data="")) as mock_open:
mock_open.side_effect = OSError
produced_at, this_update, next_update = self.checker.ocsp_times("mocked")
self.assertEqual([produced_at, this_update, next_update], [None, None, None])
@contextlib.contextmanager
def _ocsp_mock(certificate_status, response_status,
@@ -378,6 +447,17 @@ revoked
""",
"""Response verify OK""")
ocsp_times_example = ("""
Produced At: Jan 24 11:10:00 2020 GMT
This Update: Jan 24 11:00:00 2020 GMT
Next Update: Jan 31 11:00:00 2020 GMT
""", "")
ocsp_times_example_nonext = ("""
Produced At: Jan 24 11:10:00 2020 GMT
This Update: Jan 24 11:00:00 2020 GMT
""", "")
if __name__ == '__main__':
unittest.main() # pragma: no cover