Compare commits
37 Commits
refactor-c
...
test-ocsp-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1a24db4d53 | ||
|
|
7e4e0d8bdd | ||
|
|
caf2ad2cb1 | ||
|
|
17af868f62 | ||
|
|
4b3dea8be6 | ||
|
|
719142e28d | ||
|
|
d7778b0f5e | ||
|
|
590d81c3ae | ||
|
|
6395cc2b48 | ||
|
|
1ad23f9db0 | ||
|
|
a5d739f8ff | ||
|
|
d785f8c534 | ||
|
|
975025207f | ||
|
|
a02b092620 | ||
|
|
fd74aba422 | ||
|
|
b6ea34c61d | ||
|
|
a8a106c325 | ||
|
|
4138259c51 | ||
|
|
dd9f76c60c | ||
|
|
549061249f | ||
|
|
53f8ad88db | ||
|
|
fe0a985228 | ||
|
|
dfa8b2a2cd | ||
|
|
18dddd1eb5 | ||
|
|
a7f934701f | ||
|
|
8ca967a0f4 | ||
|
|
a9ce156d9c | ||
|
|
0904062015 | ||
|
|
6cfc493a71 | ||
|
|
dad0ca3505 | ||
|
|
fa8a68d45f | ||
|
|
11fce9a870 | ||
|
|
857f98d4ec | ||
|
|
065e3de422 | ||
|
|
a9f4498cc0 | ||
|
|
f5dc50491c | ||
|
|
17797b948c |
10
.travis.yml
10
.travis.yml
@@ -273,13 +273,3 @@ after_success: '[ "$TOXENV" == "py27-cover" ] && codecov -F linux'
|
||||
|
||||
notifications:
|
||||
email: false
|
||||
irc:
|
||||
channels:
|
||||
# This is set to a secure variable to prevent forks from sending
|
||||
# notifications. This value was created by installing
|
||||
# https://github.com/travis-ci/travis.rb and running
|
||||
# `travis encrypt "chat.freenode.net#certbot-devel"`.
|
||||
- secure: "EWW66E2+KVPZyIPR8ViENZwfcup4Gx3/dlimmAZE0WuLwxDCshBBOd3O8Rf6pBokEoZlXM5eDT6XdyJj8n0DLslgjO62pExdunXpbcMwdY7l1ELxX2/UbnDTE6UnPYa09qVBHNG7156Z6yE0x2lH4M9Ykvp0G0cubjPQHylAwo0="
|
||||
on_cancel: never
|
||||
on_success: never
|
||||
on_failure: always
|
||||
|
||||
@@ -1,10 +1,104 @@
|
||||
""" Utility functions for certbot-apache plugin """
|
||||
import binascii
|
||||
import hashlib
|
||||
import struct
|
||||
import shutil
|
||||
import time
|
||||
|
||||
from certbot import crypto_util
|
||||
from certbot import errors
|
||||
from certbot import util
|
||||
from certbot.compat import os
|
||||
|
||||
|
||||
def get_apache_ocsp_struct(ttl, ocsp_response):
|
||||
"""Create Apache OCSP response structure to be used in response cache
|
||||
|
||||
:param int ttl: Time-To-Live in seconds
|
||||
:param str ocsp_response: OCSP response data
|
||||
|
||||
:returns: Apache OCSP structure
|
||||
:rtype: `str`
|
||||
|
||||
"""
|
||||
ttl = time.time() + ttl
|
||||
# As microseconds
|
||||
ttl_struct = struct.pack('l', int(ttl*1000000))
|
||||
return b'\x01'.join([ttl_struct, ocsp_response])
|
||||
|
||||
|
||||
def certid_sha1_hex(cert_path):
|
||||
"""Hex representation of certificate SHA1 fingerprint
|
||||
|
||||
:param str cert_path: File path to certificate
|
||||
|
||||
:returns: Hex representation SHA1 fingerprint of certificate
|
||||
:rtype: `str`
|
||||
|
||||
"""
|
||||
sha1_hex = binascii.hexlify(certid_sha1(cert_path))
|
||||
return sha1_hex.decode('utf-8')
|
||||
|
||||
|
||||
def certid_sha1(cert_path):
|
||||
"""SHA1 fingerprint of certificate
|
||||
|
||||
:param str cert_path: File path to certificate
|
||||
|
||||
:returns: SHA1 fingerprint bytestring
|
||||
:rtype: `str`
|
||||
|
||||
"""
|
||||
return crypto_util.cert_sha1_fingerprint(cert_path)
|
||||
|
||||
|
||||
def safe_copy(source, target):
|
||||
"""Copies a file, while verifying the target integrity
|
||||
with the source. Retries twice if the initial
|
||||
copy fails.
|
||||
|
||||
:param str source: File path of the source file
|
||||
:param str target: File path of the target file
|
||||
|
||||
:raises: .errors.PluginError: If file cannot be
|
||||
copied or the target file hash does not match
|
||||
with the source file.
|
||||
"""
|
||||
for _ in range(3):
|
||||
try:
|
||||
shutil.copy2(source, target)
|
||||
except IOError as e:
|
||||
emsg = "Could not copy {} to {}: {}".format(
|
||||
source, target, e
|
||||
)
|
||||
raise errors.PluginError(emsg)
|
||||
try:
|
||||
source_hash = _file_hash(source)
|
||||
target_hash = _file_hash(target)
|
||||
except IOError:
|
||||
continue
|
||||
if source_hash == target_hash:
|
||||
return
|
||||
raise errors.PluginError(
|
||||
"Safe copy failed. The file integrity does not match"
|
||||
)
|
||||
|
||||
|
||||
def _file_hash(filepath):
|
||||
"""Helper function for safe_copy that calculates a
|
||||
sha-256 hash of file.
|
||||
|
||||
:param str filepath: Path of file to calculate hash for
|
||||
|
||||
:returns: File sha-256 hash
|
||||
:rtype: str
|
||||
"""
|
||||
fhash = hashlib.sha256()
|
||||
with open(filepath, 'rb') as fh:
|
||||
fhash.update(fh.read())
|
||||
return fhash.hexdigest()
|
||||
|
||||
|
||||
def get_mod_deps(mod_name):
|
||||
"""Get known module dependencies.
|
||||
|
||||
|
||||
@@ -22,6 +22,7 @@ from acme.magic_typing import Union # pylint: disable=unused-import, no-name-in
|
||||
from certbot import errors
|
||||
from certbot import interfaces
|
||||
from certbot import util
|
||||
|
||||
from certbot.achallenges import KeyAuthorizationAnnotatedChallenge # pylint: disable=unused-import
|
||||
from certbot.compat import filesystem
|
||||
from certbot.compat import os
|
||||
@@ -1714,7 +1715,7 @@ class ApacheConfigurator(common.Installer):
|
||||
self.parser.find_dir("SSLCertificateKeyFile",
|
||||
lineage.key_path, vhost.path))
|
||||
|
||||
def _enable_ocsp_stapling(self, ssl_vhost, unused_options):
|
||||
def _enable_ocsp_stapling(self, ssl_vhost, unused_options, prefetch=False):
|
||||
"""Enables OCSP Stapling
|
||||
|
||||
In OCSP, each client (e.g. browser) would have to query the
|
||||
@@ -1735,6 +1736,9 @@ class ApacheConfigurator(common.Installer):
|
||||
:param unused_options: Not currently used
|
||||
:type unused_options: Not Available
|
||||
|
||||
:param prefetch: Use OCSP prefetching
|
||||
:type prefetch: bool
|
||||
|
||||
:returns: Success, general_vhost (HTTP vhost)
|
||||
:rtype: (bool, :class:`~certbot_apache._internal.obj.VirtualHost`)
|
||||
|
||||
@@ -1745,8 +1749,15 @@ class ApacheConfigurator(common.Installer):
|
||||
"Unable to set OCSP directives.\n"
|
||||
"Apache version is below 2.3.3.")
|
||||
|
||||
if "socache_shmcb_module" not in self.parser.modules:
|
||||
self.enable_mod("socache_shmcb")
|
||||
if prefetch:
|
||||
if "socache_dbm_module" not in self.parser.modules:
|
||||
self.enable_mod("socache_dbm")
|
||||
cache_path = os.path.join(self.config.work_dir, "ocsp", "ocsp_cache.db")
|
||||
cache_dir = ["dbm:"+cache_path]
|
||||
else:
|
||||
if "socache_shmcb_module" not in self.parser.modules:
|
||||
self.enable_mod("socache_shmcb")
|
||||
cache_dir = ["shmcb:/var/run/apache2/stapling_cache(128000)"]
|
||||
|
||||
# Check if there's an existing SSLUseStapling directive on.
|
||||
use_stapling_aug_path = self.parser.find_dir("SSLUseStapling",
|
||||
@@ -1766,9 +1777,7 @@ class ApacheConfigurator(common.Installer):
|
||||
self.parser.aug.remove(
|
||||
re.sub(r"/\w*$", "", stapling_cache_aug_path[0]))
|
||||
|
||||
self.parser.add_dir_to_ifmodssl(ssl_vhost_aug_path,
|
||||
"SSLStaplingCache",
|
||||
["shmcb:/var/run/apache2/stapling_cache(128000)"])
|
||||
self.parser.add_dir_to_ifmodssl(ssl_vhost_aug_path, "SSLStaplingCache", cache_dir)
|
||||
|
||||
msg = "OCSP Stapling was enabled on SSL Vhost: %s.\n"%(
|
||||
ssl_vhost.filep)
|
||||
|
||||
@@ -67,3 +67,9 @@ AUTOHSTS_FREQ = 172800
|
||||
MANAGED_COMMENT = "DO NOT REMOVE - Managed by Certbot"
|
||||
MANAGED_COMMENT_ID = MANAGED_COMMENT+", VirtualHost id: {0}"
|
||||
"""Managed by Certbot comments and the VirtualHost identification template"""
|
||||
|
||||
OCSP_APACHE_TTL = 432000
|
||||
"""Apache TTL for OCSP response in seconds: 5 days"""
|
||||
|
||||
OCSP_INTERNAL_TTL = 86400
|
||||
"""Internal TTL for OCSP response in seconds: 1 day"""
|
||||
|
||||
@@ -9,14 +9,16 @@ from certbot import interfaces
|
||||
from certbot import util
|
||||
from certbot.compat import filesystem
|
||||
from certbot.compat import os
|
||||
|
||||
from certbot_apache._internal import apache_util
|
||||
from certbot_apache._internal import configurator
|
||||
from certbot_apache._internal import prefetch_ocsp
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@zope.interface.provider(interfaces.IPluginFactory)
|
||||
class DebianConfigurator(configurator.ApacheConfigurator):
|
||||
class DebianConfigurator(prefetch_ocsp.OCSPPrefetchMixin, configurator.ApacheConfigurator):
|
||||
"""Debian specific ApacheConfigurator override class"""
|
||||
|
||||
OS_DEFAULTS = dict(
|
||||
|
||||
394
certbot-apache/certbot_apache/_internal/prefetch_ocsp.py
Normal file
394
certbot-apache/certbot_apache/_internal/prefetch_ocsp.py
Normal file
@@ -0,0 +1,394 @@
|
||||
"""A mixin class for OCSP response prefetching for Apache plugin.
|
||||
|
||||
The OCSP prefetching functionality solves multiple issues in Apache httpd
|
||||
that make using OCSP must-staple error prone.
|
||||
|
||||
The prefetching functionality works by storing a value to PluginStorage,
|
||||
noting certificates that Certbot should keep OCSP staples (OCSP responses)
|
||||
updated for alongside of the information when the last response was
|
||||
updated by Certbot.
|
||||
|
||||
When Certbot is invoked, typically by scheduled "certbot renew" and the
|
||||
TTL from "lastupdate" value in PluginStorage entry has expired,
|
||||
Certbot then proceeds to fetch a new OCSP response from the OCSP servers
|
||||
pointed by the certificate.
|
||||
|
||||
The OCSP response is validated and if valid, stored to Apache DBM
|
||||
cache. A high internal cache expiry value is set for Apache in order
|
||||
to make it to not to discard the stored response and try to renew
|
||||
the staple itself letting Certbot to renew it on its subsequent run
|
||||
instead.
|
||||
|
||||
The DBM cache file used by Apache is a lightweight key-value storage.
|
||||
For OCSP response caching, the sha1 hash of certificate fingerprint
|
||||
is used as a key. The value consists of expiry time as timestamp
|
||||
in microseconds, \x01 delimiter and the raw OCSP response.
|
||||
|
||||
When restarting Apache, Certbot backups the current OCSP response
|
||||
cache, and restores it after the restart has happened. This is
|
||||
done because Apache deletes and then recreates the file upon
|
||||
restart.
|
||||
"""
|
||||
|
||||
from datetime import datetime
|
||||
import logging
|
||||
import shutil
|
||||
import time
|
||||
|
||||
from acme.magic_typing import Dict # pylint: disable=unused-import, no-name-in-module
|
||||
|
||||
from certbot import errors
|
||||
from certbot._internal import ocsp
|
||||
from certbot.plugins.enhancements import OCSPPrefetchEnhancement
|
||||
|
||||
from certbot.compat import filesystem
|
||||
from certbot.compat import os
|
||||
from certbot_apache._internal import apache_util
|
||||
from certbot_apache._internal import constants
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DBMHandler(object):
|
||||
"""Context manager to handle DBM file reads and writes"""
|
||||
|
||||
def __init__(self, filename, mode):
|
||||
self.filename = filename
|
||||
self.filemode = mode
|
||||
self.bsddb = False
|
||||
self.database = None
|
||||
|
||||
def __enter__(self):
|
||||
"""Open the DBM file and return the filehandle"""
|
||||
|
||||
try:
|
||||
import bsddb
|
||||
self.bsddb = True
|
||||
try:
|
||||
self.database = bsddb.hashopen(self.filename, self.filemode)
|
||||
except Exception:
|
||||
raise errors.PluginError("Unable to open dbm database file.")
|
||||
except ImportError:
|
||||
# Python3 doesn't have bsddb module, so we use dbm.ndbm instead
|
||||
import dbm
|
||||
if self.filename.endswith(".db"):
|
||||
self.filename = self.filename[:-3]
|
||||
try:
|
||||
self.database = dbm.ndbm.open(self.filename, self.filemode) # pylint: disable=no-member
|
||||
except Exception:
|
||||
# This is raised if a file cannot be found
|
||||
raise errors.PluginError("Unable to open dbm database file.")
|
||||
return self.database
|
||||
|
||||
def __exit__(self, *args):
|
||||
"""Close the DBM file"""
|
||||
if self.bsddb:
|
||||
self.database.sync()
|
||||
self.database.close()
|
||||
|
||||
|
||||
class OCSPPrefetchMixin(object):
|
||||
"""OCSPPrefetchMixin implements OCSP response prefetching"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self._ocsp_prefetch = {} # type: Dict[str, str]
|
||||
# This is required because of python super() call chain.
|
||||
# Additionally, mypy isn't able to figure the chain out and needs to be
|
||||
# disabled for this line. See https://github.com/python/mypy/issues/5887
|
||||
super(OCSPPrefetchMixin, self).__init__(*args, **kwargs) # type: ignore
|
||||
|
||||
def _ensure_ocsp_dirs(self):
|
||||
"""Makes sure that the OCSP directory paths exist."""
|
||||
ocsp_work = os.path.join(self.config.work_dir, "ocsp_work")
|
||||
ocsp_save = os.path.join(self.config.work_dir, "ocsp")
|
||||
for path in [ocsp_work, ocsp_save]:
|
||||
if not os.path.isdir(path):
|
||||
filesystem.makedirs(path, 0o755)
|
||||
|
||||
def _ensure_ocsp_prefetch_compatibility(self):
|
||||
"""Make sure that the operating system supports the required libraries
|
||||
to manage Apache DBM files.
|
||||
|
||||
:raises: errors.NotSupportedError
|
||||
"""
|
||||
try:
|
||||
import bsddb # pylint: disable=unused-variable
|
||||
except ImportError:
|
||||
import dbm
|
||||
if not hasattr(dbm, 'ndbm') or dbm.ndbm.library != 'Berkeley DB': # pylint: disable=no-member
|
||||
msg = ("Unfortunately your operating system does not have a "
|
||||
"compatible database module available for managing "
|
||||
"Apache OCSP stapling cache database.")
|
||||
raise errors.NotSupportedError(msg)
|
||||
|
||||
def _ocsp_refresh_needed(self, pf_obj):
|
||||
"""Refreshes OCSP response for a certiifcate if it's due
|
||||
|
||||
:param dict pf_obj: OCSP prefetch object from pluginstorage
|
||||
|
||||
:returns: If OCSP response was updated
|
||||
:rtype: bool
|
||||
|
||||
"""
|
||||
ttl = pf_obj["lastupdate"] + constants.OCSP_INTERNAL_TTL
|
||||
if ttl < time.time():
|
||||
self._ocsp_refresh(pf_obj["cert_path"], pf_obj["chain_path"])
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def _ocsp_refresh(self, cert_path, chain_path):
|
||||
"""Refresh the OCSP response for a certificate
|
||||
|
||||
:param str cert_path: Filesystem path to certificate file
|
||||
:param str chain_path: Filesystem path to certificate chain file
|
||||
|
||||
"""
|
||||
|
||||
self._ensure_ocsp_dirs()
|
||||
ocsp_workfile = os.path.join(
|
||||
self.config.work_dir, "ocsp_work",
|
||||
apache_util.certid_sha1_hex(cert_path))
|
||||
handler = ocsp.RevocationChecker()
|
||||
if not handler.ocsp_revoked_by_paths(cert_path, chain_path, ocsp_workfile):
|
||||
# Guaranteed good response
|
||||
cache_path = os.path.join(self.config.work_dir, "ocsp", "ocsp_cache.db")
|
||||
cert_sha = apache_util.certid_sha1(cert_path)
|
||||
# dbm.open automatically adds the file extension
|
||||
self._write_to_dbm(cache_path, cert_sha, self._ocsp_response_dbm(ocsp_workfile))
|
||||
else:
|
||||
logger.warning("Encountered an issue while trying to prefetch OCSP "
|
||||
"response for certificate: %s", cert_path)
|
||||
# Clean up
|
||||
try:
|
||||
os.remove(ocsp_workfile)
|
||||
except OSError:
|
||||
# The OCSP workfile did not exist because of an OCSP response fetching error
|
||||
return
|
||||
|
||||
def _write_to_dbm(self, filename, key, value):
|
||||
"""Helper method to write an OCSP response cache value to DBM.
|
||||
|
||||
:param filename: DBM database filename
|
||||
:param bytes key: Database key name
|
||||
:param bytes value: Database entry value
|
||||
"""
|
||||
tmp_file = os.path.join(
|
||||
self.config.work_dir,
|
||||
"ocsp_work",
|
||||
"tmp_" + os.path.basename(filename)
|
||||
)
|
||||
|
||||
apache_util.safe_copy(filename, tmp_file)
|
||||
|
||||
with DBMHandler(tmp_file, 'w') as db:
|
||||
db[key] = value
|
||||
|
||||
shutil.copy2(tmp_file, filename)
|
||||
os.remove(tmp_file)
|
||||
|
||||
def _read_dbm(self, filename):
|
||||
"""Helper method for reading the dbm using context manager.
|
||||
Used for tests.
|
||||
|
||||
:param str filename: DBM database filename
|
||||
|
||||
:returns: Dictionary of database keys and values
|
||||
:rtype: dict
|
||||
"""
|
||||
|
||||
ret = dict()
|
||||
with DBMHandler(filename, 'r') as db:
|
||||
for k in db.keys():
|
||||
ret[k] = db[k]
|
||||
return ret
|
||||
|
||||
def _ocsp_ttl(self, next_update):
|
||||
"""Calculates Apache internal TTL for the next OCSP staple
|
||||
update.
|
||||
|
||||
The resulting TTL is half of the time between now
|
||||
and the time noted by nextUpdate field in OCSP response.
|
||||
|
||||
If nextUpdate value is None, a default value will be
|
||||
used instead.
|
||||
|
||||
:param next_update: datetime value for nextUpdate or None
|
||||
|
||||
:returns: TTL in seconds.
|
||||
:rtype: int
|
||||
"""
|
||||
|
||||
if next_update is not None:
|
||||
now = datetime.fromtimestamp(time.time())
|
||||
res_ttl = int((next_update - now).total_seconds())
|
||||
if res_ttl > 0:
|
||||
return res_ttl/2
|
||||
return constants.OCSP_APACHE_TTL
|
||||
|
||||
def _ocsp_response_dbm(self, workfile):
|
||||
"""Creates a dbm entry for OCSP response data
|
||||
|
||||
:param str workfile: File path for raw OCSP response
|
||||
|
||||
:returns: OCSP response cache data that Apache can use
|
||||
:rtype: string
|
||||
|
||||
"""
|
||||
|
||||
handler = ocsp.RevocationChecker()
|
||||
_, _, next_update = handler.ocsp_times(workfile)
|
||||
ttl = self._ocsp_ttl(next_update)
|
||||
|
||||
with open(workfile, 'rb') as fh:
|
||||
response = fh.read()
|
||||
return apache_util.get_apache_ocsp_struct(ttl, response)
|
||||
|
||||
def _ocsp_prefetch_save(self, cert_path, chain_path):
|
||||
"""Saves status of current OCSP prefetch, including the last update
|
||||
time to determine if an update is needed on later run.
|
||||
|
||||
:param str cert_path: Filesystem path to certificate
|
||||
:param str chain_path: Filesystem path to certificate chain file
|
||||
|
||||
"""
|
||||
status = {
|
||||
"lastupdate": time.time(),
|
||||
"cert_path": cert_path,
|
||||
"chain_path": chain_path
|
||||
}
|
||||
cert_id = apache_util.certid_sha1_hex(cert_path)
|
||||
self._ocsp_prefetch[cert_id] = status
|
||||
self.storage.put("ocsp_prefetch", self._ocsp_prefetch)
|
||||
self.storage.save()
|
||||
|
||||
def _ocsp_prefetch_fetch_state(self):
|
||||
"""
|
||||
Populates the OCSP prefetch state from the pluginstorage.
|
||||
"""
|
||||
try:
|
||||
self._ocsp_prefetch = self.storage.fetch("ocsp_prefetch")
|
||||
except KeyError:
|
||||
self._ocsp_prefetch = dict()
|
||||
|
||||
def _ocsp_prefetch_backup_db(self):
|
||||
"""
|
||||
Copies the active dbm file to work directory. Logs a debug error
|
||||
message if unable to copy, but does not error out as it would
|
||||
prevent other critical functions that need to be carried out for
|
||||
Apache httpd.
|
||||
"""
|
||||
self._ensure_ocsp_dirs()
|
||||
cache_path = os.path.join(self.config.work_dir, "ocsp", "ocsp_cache.db")
|
||||
try:
|
||||
shutil.copy2(cache_path, os.path.join(self.config.work_dir, "ocsp_work"))
|
||||
except IOError:
|
||||
logger.debug("Encountered an issue while trying to backup OCSP dbm file")
|
||||
|
||||
def _ocsp_prefetch_restore_db(self):
|
||||
"""
|
||||
Restores the active dbm file from work directory. Logs a debug error
|
||||
message if unable to restore, but does not error out as it would
|
||||
prevent other critical functions that need to be carried out for
|
||||
Apache httpd.
|
||||
|
||||
"""
|
||||
self._ensure_ocsp_dirs()
|
||||
cache_path = os.path.join(self.config.work_dir, "ocsp", "ocsp_cache.db")
|
||||
work_file_path = os.path.join(self.config.work_dir, "ocsp_work", "ocsp_cache.db")
|
||||
try:
|
||||
shutil.copy2(work_file_path, cache_path)
|
||||
except IOError:
|
||||
logger.debug("Encountered an issue when trying to restore OCSP dbm file")
|
||||
|
||||
def enable_ocsp_prefetch(self, lineage, domains):
|
||||
"""Enable OCSP Stapling and prefetching of the responses.
|
||||
|
||||
In OCSP, each client (e.g. browser) would have to query the
|
||||
OCSP Responder to validate that the site certificate was not revoked.
|
||||
|
||||
Enabling OCSP Stapling, would allow the web-server to query the OCSP
|
||||
Responder, and staple its response to the offered certificate during
|
||||
TLS. i.e. clients would not have to query the OCSP responder.
|
||||
|
||||
"""
|
||||
|
||||
# Fail early if we are not able to support this
|
||||
self._ensure_ocsp_prefetch_compatibility()
|
||||
prefetch_vhosts = set()
|
||||
for domain in domains:
|
||||
matched_vhosts = self.choose_vhosts(domain, create_if_no_ssl=False)
|
||||
# We should be handling only SSL vhosts
|
||||
for vh in matched_vhosts:
|
||||
if vh.ssl:
|
||||
prefetch_vhosts.add(vh)
|
||||
|
||||
if not prefetch_vhosts:
|
||||
raise errors.MisconfigurationError(
|
||||
"Could not find VirtualHost to enable OCSP prefetching on."
|
||||
)
|
||||
|
||||
try:
|
||||
# The try - block is huge, but required for handling rollback properly.
|
||||
for vh in prefetch_vhosts:
|
||||
self._enable_ocsp_stapling(vh, None, prefetch=True)
|
||||
|
||||
self._ensure_ocsp_dirs()
|
||||
self.restart()
|
||||
# Ensure Apache has enough time to properly restart and create the file
|
||||
time.sleep(2)
|
||||
self._ocsp_refresh(lineage.cert_path, lineage.chain_path)
|
||||
self._ocsp_prefetch_save(lineage.cert_path, lineage.chain_path)
|
||||
self.save("Enabled OCSP prefetching")
|
||||
except errors.PluginError as err:
|
||||
# Revert the OCSP prefetch configuration
|
||||
self.recovery_routine()
|
||||
self.restart()
|
||||
msg = ("Encountered an error while trying to enable OCSP prefetch "
|
||||
"enhancement: %s\nOCSP prefetch was not enabled.")
|
||||
raise errors.PluginError(msg % str(err))
|
||||
|
||||
def update_ocsp_prefetch(self, _unused_lineage):
|
||||
"""Checks all certificates that are managed by OCSP prefetch, and
|
||||
refreshes OCSP responses for them if required."""
|
||||
|
||||
self._ocsp_prefetch_fetch_state()
|
||||
if not self._ocsp_prefetch:
|
||||
# No OCSP prefetching enabled for any certificate
|
||||
return
|
||||
|
||||
for _, pf in self._ocsp_prefetch.items():
|
||||
if not self._ocsp_refresh_needed(pf):
|
||||
continue
|
||||
# Save the status to pluginstorage
|
||||
self._ocsp_prefetch_save(pf["cert_path"], pf["chain_path"])
|
||||
|
||||
def restart(self):
|
||||
"""Reloads the Apache server. When restarting, Apache deletes
|
||||
the DBM cache file used to store OCSP staples. In this override
|
||||
function, Certbot checks the pluginstorage if we're supposed to
|
||||
manage OCSP prefetching. If needed, Certbot will backup the DBM
|
||||
file, restoring it after calling restart.
|
||||
|
||||
:raises .errors.MisconfigurationError: If either the config test
|
||||
or reload fails.
|
||||
|
||||
"""
|
||||
if not self._ocsp_prefetch:
|
||||
# Try to populate OCSP prefetch structure from pluginstorage
|
||||
self._ocsp_prefetch_fetch_state()
|
||||
if self._ocsp_prefetch:
|
||||
# OCSP prefetching is enabled, so back up the db
|
||||
self._ocsp_prefetch_backup_db()
|
||||
|
||||
try:
|
||||
# Ignored because mypy doesn't know that this class is used as
|
||||
# a mixin and fails because object has no restart method.
|
||||
super(OCSPPrefetchMixin, self).restart() # type: ignore
|
||||
finally:
|
||||
if self._ocsp_prefetch:
|
||||
# Restore the backed up dbm database
|
||||
self._ocsp_prefetch_restore_db()
|
||||
|
||||
|
||||
OCSPPrefetchEnhancement.register(OCSPPrefetchMixin) # pylint: disable=no-member
|
||||
@@ -1,3 +1,3 @@
|
||||
# Remember to update setup.py to match the package versions below.
|
||||
acme[dev]==0.29.0
|
||||
certbot[dev]==1.1.0
|
||||
-e certbot[dev]
|
||||
|
||||
@@ -10,7 +10,7 @@ version = '1.2.0.dev0'
|
||||
# acme/certbot version.
|
||||
install_requires = [
|
||||
'acme>=0.29.0',
|
||||
'certbot>=1.1.0',
|
||||
'certbot>=1.2.0.dev0',
|
||||
'mock',
|
||||
'python-augeas',
|
||||
'setuptools',
|
||||
|
||||
415
certbot-apache/tests/ocsp_prefetch_test.py
Normal file
415
certbot-apache/tests/ocsp_prefetch_test.py
Normal file
@@ -0,0 +1,415 @@
|
||||
"""Test for certbot_apache._internal.configurator OCSP Prefetching functionality"""
|
||||
import base64
|
||||
from datetime import datetime
|
||||
from datetime import timedelta
|
||||
import json
|
||||
import sys
|
||||
import unittest
|
||||
|
||||
import mock
|
||||
# six is used in mock.patch()
|
||||
import six # pylint: disable=unused-import
|
||||
|
||||
from acme.magic_typing import Dict, List, Set, Union # pylint: disable=unused-import, no-name-in-module
|
||||
|
||||
from certbot import errors
|
||||
|
||||
from certbot.compat import os
|
||||
import util
|
||||
|
||||
|
||||
class MockDBM(object):
|
||||
# pylint: disable=missing-docstring
|
||||
"""Main mock DBM class for Py3 dbm module"""
|
||||
def __init__(self, library='Berkeley DB'):
|
||||
self.ndbm = Mockdbm_impl(library)
|
||||
|
||||
|
||||
class Mockdbm_impl(object):
|
||||
"""Mock dbm implementation that satisfies both bsddb and dbm interfaces"""
|
||||
# pylint: disable=missing-docstring
|
||||
|
||||
def __init__(self, library='Berkeley DB'):
|
||||
self.library = library
|
||||
self.name = 'ndbm'
|
||||
|
||||
def open(self, path, mode):
|
||||
return Mockdb(path, mode)
|
||||
|
||||
def hashopen(self, path, mode):
|
||||
return Mockdb(path, mode)
|
||||
|
||||
|
||||
class Mockdb(object):
|
||||
"""Mock dbm.db for both bsddb and dbm databases"""
|
||||
# pylint: disable=missing-docstring
|
||||
def __init__(self, path, mode):
|
||||
self._data = dict() # type: Dict[str, str]
|
||||
if mode == "r" or mode == "w":
|
||||
if not path.endswith(".db"):
|
||||
path = path+".db"
|
||||
with open(path, 'r') as fh:
|
||||
try:
|
||||
self._data = json.loads(fh.read())
|
||||
except Exception: # pylint: disable=broad-except
|
||||
self._data = dict()
|
||||
self.path = path
|
||||
self.mode = mode
|
||||
|
||||
def __setitem__(self, key, item):
|
||||
bkey = base64.b64encode(key)
|
||||
bitem = base64.b64encode(item)
|
||||
self._data[bkey.decode()] = bitem.decode()
|
||||
|
||||
def __getitem__(self, key):
|
||||
bkey = base64.b64encode(key)
|
||||
return base64.b64decode(self._data[bkey.decode()])
|
||||
|
||||
def keys(self):
|
||||
return [base64.b64decode(k) for k in self._data.keys()]
|
||||
|
||||
def sync(self):
|
||||
return
|
||||
|
||||
def close(self):
|
||||
with open(self.path, 'w') as fh:
|
||||
fh.write(json.dumps(self._data))
|
||||
|
||||
|
||||
class OCSPPrefetchTest(util.ApacheTest):
|
||||
"""Tests for OCSP Prefetch feature"""
|
||||
# pylint: disable=protected-access
|
||||
|
||||
def setUp(self): # pylint: disable=arguments-differ
|
||||
super(OCSPPrefetchTest, self).setUp()
|
||||
|
||||
self.config = util.get_apache_configurator(
|
||||
self.config_path, self.vhost_path, self.config_dir, self.work_dir,
|
||||
os_info="debian")
|
||||
|
||||
self.lineage = mock.MagicMock(cert_path="cert", chain_path="chain")
|
||||
self.config.parser.modules.add("headers_module")
|
||||
self.config.parser.modules.add("mod_headers.c")
|
||||
self.config.parser.modules.add("ssl_module")
|
||||
self.config.parser.modules.add("mod_ssl.c")
|
||||
self.config.parser.modules.add("socache_dbm_module")
|
||||
self.config.parser.modules.add("mod_socache_dbm.c")
|
||||
|
||||
self.vh_truth = util.get_vh_truth(
|
||||
self.temp_dir, "debian_apache_2_4/multiple_vhosts")
|
||||
self.config._ensure_ocsp_dirs()
|
||||
self.db_path = os.path.join(self.work_dir, "ocsp", "ocsp_cache") + ".db"
|
||||
|
||||
def _call_mocked(self, func, *args, **kwargs):
|
||||
"""Helper method to call functins with mock stack"""
|
||||
|
||||
def mock_restart():
|
||||
"""Mock ApacheConfigurator.restart that creates the dbm file"""
|
||||
# Mock the Apache dbm file creation
|
||||
open(self.db_path, 'a').close()
|
||||
|
||||
ver_path = "certbot_apache._internal.configurator.ApacheConfigurator.get_version"
|
||||
res_path = "certbot_apache._internal.prefetch_ocsp.OCSPPrefetchMixin.restart"
|
||||
cry_path = "certbot.crypto_util.cert_sha1_fingerprint"
|
||||
|
||||
with mock.patch(ver_path) as mock_ver:
|
||||
mock_ver.return_value = (2, 4, 10)
|
||||
with mock.patch(cry_path) as mock_cry:
|
||||
mock_cry.return_value = b'j\x056\x1f\xfa\x08B\xe8D\xa1Bn\xeb*A\xebWx\xdd\xfe'
|
||||
with mock.patch(res_path, side_effect=mock_restart):
|
||||
return func(*args, **kwargs)
|
||||
|
||||
def call_mocked_py2(self, func, *args, **kwargs):
|
||||
"""Calls methods with imports mocked to suit Py2 environment"""
|
||||
if 'dbm' in sys.modules.keys(): # pragma: no cover
|
||||
sys.modules['dbm'] = None
|
||||
sys.modules['bsddb'] = Mockdbm_impl()
|
||||
return self._call_mocked(func, *args, **kwargs)
|
||||
|
||||
def call_mocked_py3(self, func, *args, **kwargs):
|
||||
"""Calls methods with imports mocked to suit Py3 environment"""
|
||||
real_import = six.moves.builtins.__import__
|
||||
|
||||
def mock_import(*args, **kwargs):
|
||||
"""Mock import to raise ImportError for Py2 specific module to make
|
||||
ApacheConfigurator pick the correct one for Python3 regardless of the
|
||||
python version the tests are running under."""
|
||||
if args[0] == "bsddb":
|
||||
raise ImportError
|
||||
return real_import(*args, **kwargs)
|
||||
|
||||
with mock.patch('six.moves.builtins.__import__', side_effect=mock_import):
|
||||
sys.modules['dbm'] = MockDBM()
|
||||
return self._call_mocked(func, *args, **kwargs)
|
||||
|
||||
@mock.patch("certbot_apache._internal.override_debian.DebianConfigurator.enable_mod")
|
||||
def test_ocsp_prefetch_enable_mods(self, mock_enable):
|
||||
self.config.parser.modules.discard("socache_dbm_module")
|
||||
self.config.parser.modules.discard("mod_socache_dbm.c")
|
||||
self.config.parser.modules.discard("headers_module")
|
||||
self.config.parser.modules.discard("mod_header.c")
|
||||
|
||||
ref_path = "certbot_apache._internal.override_debian.DebianConfigurator._ocsp_refresh"
|
||||
with mock.patch(ref_path):
|
||||
self.call_mocked_py2(self.config.enable_ocsp_prefetch,
|
||||
self.lineage, ["ocspvhost.com"])
|
||||
self.assertTrue(mock_enable.called)
|
||||
self.assertEqual(len(self.config._ocsp_prefetch), 1)
|
||||
|
||||
@mock.patch("certbot_apache._internal.override_debian.DebianConfigurator.enable_mod")
|
||||
def test_ocsp_prefetch_enable_error(self, _mock_enable):
|
||||
ref_path = "certbot_apache._internal.override_debian.DebianConfigurator._ocsp_refresh"
|
||||
self.config.recovery_routine = mock.MagicMock()
|
||||
with mock.patch(ref_path, side_effect=errors.PluginError("failed")):
|
||||
self.assertRaises(errors.PluginError,
|
||||
self.call_mocked_py2,
|
||||
self.config.enable_ocsp_prefetch,
|
||||
self.lineage,
|
||||
["ocspvhost.com"])
|
||||
self.assertTrue(self.config.recovery_routine.called)
|
||||
|
||||
def test_ocsp_prefetch_vhost_not_found_error(self):
|
||||
choose_path = "certbot_apache._internal.override_debian.DebianConfigurator.choose_vhosts"
|
||||
with mock.patch(choose_path) as mock_choose:
|
||||
mock_choose.return_value = []
|
||||
self.assertRaises(errors.MisconfigurationError,
|
||||
self.call_mocked_py2,
|
||||
self.config.enable_ocsp_prefetch,
|
||||
self.lineage,
|
||||
["domainnotfound"])
|
||||
|
||||
@mock.patch("certbot_apache._internal.constants.OCSP_INTERNAL_TTL", 0)
|
||||
def test_ocsp_prefetch_refresh(self):
|
||||
def ocsp_req_mock(_cert, _chain, workfile):
|
||||
"""Method to mock the OCSP request and write response to file"""
|
||||
with open(workfile, 'w') as fh:
|
||||
fh.write("MOCKRESPONSE")
|
||||
return False
|
||||
|
||||
ocsp_path = "certbot._internal.ocsp.RevocationChecker.ocsp_revoked_by_paths"
|
||||
with mock.patch(ocsp_path, side_effect=ocsp_req_mock):
|
||||
with mock.patch('certbot._internal.ocsp.RevocationChecker.ocsp_times') as mock_times:
|
||||
produced_at = datetime.today() - timedelta(days=1)
|
||||
this_update = datetime.today() - timedelta(days=2)
|
||||
next_update = datetime.today() + timedelta(days=2)
|
||||
mock_times.return_value = produced_at, this_update, next_update
|
||||
self.call_mocked_py2(self.config.enable_ocsp_prefetch,
|
||||
self.lineage, ["ocspvhost.com"])
|
||||
odbm = self.config._read_dbm(self.db_path)
|
||||
self.assertEqual(len(odbm.keys()), 1)
|
||||
# The actual response data is prepended by Apache timestamp
|
||||
self.assertTrue(odbm[list(odbm.keys())[0]].endswith(b'MOCKRESPONSE'))
|
||||
|
||||
with mock.patch(ocsp_path, side_effect=ocsp_req_mock) as mock_ocsp:
|
||||
with mock.patch('certbot._internal.ocsp.RevocationChecker.ocsp_times') as mock_times:
|
||||
produced_at = datetime.today() - timedelta(days=1)
|
||||
this_update = datetime.today() - timedelta(days=2)
|
||||
next_update = datetime.today() + timedelta(days=2)
|
||||
mock_times.return_value = produced_at, this_update, next_update
|
||||
self.call_mocked_py2(self.config.update_ocsp_prefetch, None)
|
||||
self.assertTrue(mock_ocsp.called)
|
||||
|
||||
def test_ocsp_prefetch_refresh_noop(self):
|
||||
def ocsp_req_mock(_cert, _chain, workfile):
|
||||
"""Method to mock the OCSP request and write response to file"""
|
||||
with open(workfile, 'w') as fh:
|
||||
fh.write("MOCKRESPONSE")
|
||||
return True
|
||||
|
||||
ocsp_path = "certbot._internal.ocsp.RevocationChecker.ocsp_revoked_by_paths"
|
||||
with mock.patch(ocsp_path, side_effect=ocsp_req_mock):
|
||||
self.call_mocked_py2(self.config.enable_ocsp_prefetch,
|
||||
self.lineage, ["ocspvhost.com"])
|
||||
self.assertEqual(len(self.config._ocsp_prefetch), 1)
|
||||
refresh_path = "certbot_apache._internal.override_debian.DebianConfigurator._ocsp_refresh"
|
||||
with mock.patch(refresh_path) as mock_refresh:
|
||||
self.call_mocked_py2(self.config.update_ocsp_prefetch, None)
|
||||
self.assertFalse(mock_refresh.called)
|
||||
|
||||
@mock.patch("certbot_apache._internal.configurator.ApacheConfigurator.config_test")
|
||||
def test_ocsp_prefetch_backup_db(self, _mock_test):
|
||||
def ocsp_del_db():
|
||||
"""Side effect of _reload() that deletes the DBM file, like Apache
|
||||
does when restarting"""
|
||||
os.remove(self.db_path)
|
||||
self.assertFalse(os.path.isfile(self.db_path))
|
||||
|
||||
# Make sure that the db file exists
|
||||
open(self.db_path, 'a').close()
|
||||
self.call_mocked_py2(self.config._write_to_dbm, self.db_path, b'mock_key', b'mock_value')
|
||||
|
||||
# Mock OCSP prefetch dict to signify that there should be a db
|
||||
self.config._ocsp_prefetch = {"mock": "value"}
|
||||
rel_path = "certbot_apache._internal.configurator.ApacheConfigurator._reload"
|
||||
with mock.patch(rel_path, side_effect=ocsp_del_db):
|
||||
self.config.restart()
|
||||
|
||||
odbm = self.config._read_dbm(self.db_path)
|
||||
self.assertEqual(odbm[b'mock_key'], b'mock_value')
|
||||
|
||||
@mock.patch("certbot_apache._internal.configurator.ApacheConfigurator.config_test")
|
||||
@mock.patch("certbot_apache._internal.configurator.ApacheConfigurator._reload")
|
||||
def test_ocsp_prefetch_backup_db_error(self, _mock_reload, _mock_test):
|
||||
log_path = "certbot_apache._internal.prefetch_ocsp.logger.debug"
|
||||
log_string = "Encountered an issue while trying to backup OCSP dbm file"
|
||||
log_string2 = "Encountered an issue when trying to restore OCSP dbm file"
|
||||
self.config._ocsp_prefetch = {"mock": "value"}
|
||||
with mock.patch("shutil.copy2", side_effect=IOError):
|
||||
with mock.patch(log_path) as mock_log:
|
||||
self.config.restart()
|
||||
self.assertTrue(mock_log.called)
|
||||
self.assertEqual(mock_log.call_count, 2)
|
||||
self.assertTrue(log_string in mock_log.call_args_list[0][0][0])
|
||||
self.assertTrue(log_string2 in mock_log.call_args_list[1][0][0])
|
||||
|
||||
@mock.patch("certbot_apache._internal.prefetch_ocsp.OCSPPrefetchMixin.restart")
|
||||
def test_ocsp_prefetch_refresh_fail(self, _mock_restart):
|
||||
ocsp_path = "certbot._internal.ocsp.RevocationChecker.ocsp_revoked_by_paths"
|
||||
log_path = "certbot_apache._internal.prefetch_ocsp.logger.warning"
|
||||
with mock.patch(ocsp_path) as mock_ocsp:
|
||||
mock_ocsp.return_value = True
|
||||
with mock.patch(log_path) as mock_log:
|
||||
self.call_mocked_py2(self.config.enable_ocsp_prefetch,
|
||||
self.lineage, ["ocspvhost.com"])
|
||||
self.assertTrue(mock_log.called)
|
||||
self.assertTrue(
|
||||
"trying to prefetch OCSP" in mock_log.call_args[0][0])
|
||||
|
||||
@mock.patch("certbot_apache._internal.override_debian.DebianConfigurator._ocsp_refresh_needed")
|
||||
def test_ocsp_prefetch_update_noop(self, mock_refresh):
|
||||
self.config.update_ocsp_prefetch(None)
|
||||
self.assertFalse(mock_refresh.called)
|
||||
|
||||
def test_ocsp_prefetch_preflight_check_noerror(self):
|
||||
self.call_mocked_py2(self.config._ensure_ocsp_prefetch_compatibility)
|
||||
self.call_mocked_py3(self.config._ensure_ocsp_prefetch_compatibility)
|
||||
|
||||
real_import = six.moves.builtins.__import__
|
||||
|
||||
def mock_import(*args, **kwargs):
|
||||
if args[0] == "bsddb":
|
||||
raise ImportError
|
||||
return real_import(*args, **kwargs)
|
||||
with mock.patch('six.moves.builtins.__import__', side_effect=mock_import):
|
||||
sys.modules['dbm'] = MockDBM(library='Not Berkeley DB')
|
||||
self.assertRaises(errors.NotSupportedError,
|
||||
self._call_mocked,
|
||||
self.config._ensure_ocsp_prefetch_compatibility)
|
||||
|
||||
def test_ocsp_prefetch_open_dbm_no_file(self):
|
||||
open(self.db_path, 'a').close()
|
||||
db_not_exists = self.db_path+"nonsense"
|
||||
self.call_mocked_py2(self.config._write_to_dbm, self.db_path, b'k', b'v')
|
||||
self.assertRaises(errors.PluginError,
|
||||
self.call_mocked_py2,
|
||||
self.config._write_to_dbm,
|
||||
db_not_exists,
|
||||
b'k', b'v')
|
||||
|
||||
def test_ocsp_prefetch_py2_open_file_error(self):
|
||||
open(self.db_path, 'a').close()
|
||||
mock_db = mock.MagicMock()
|
||||
mock_db.hashopen.side_effect = Exception("error")
|
||||
sys.modules["bsddb"] = mock_db
|
||||
self.assertRaises(errors.PluginError,
|
||||
self.config._write_to_dbm,
|
||||
self.db_path,
|
||||
b'k', b'v')
|
||||
|
||||
def test_ocsp_prefetch_py3_open_file_error(self):
|
||||
open(self.db_path, 'a').close()
|
||||
mock_db = mock.MagicMock()
|
||||
mock_db.ndbm.open.side_effect = Exception("error")
|
||||
sys.modules["dbm"] = mock_db
|
||||
sys.modules["bsddb"] = None
|
||||
self.assertRaises(errors.PluginError,
|
||||
self.config._write_to_dbm,
|
||||
self.db_path,
|
||||
b'k', b'v')
|
||||
|
||||
def test_ocsp_prefetch_open_close_py2_noerror(self):
|
||||
expected_val = b'whatever_value'
|
||||
open(self.db_path, 'a').close()
|
||||
self.call_mocked_py2(
|
||||
self.config._write_to_dbm, self.db_path,
|
||||
b'key', expected_val
|
||||
)
|
||||
db2 = self.call_mocked_py2(self.config._read_dbm, self.db_path)
|
||||
self.assertEqual(db2[b'key'], expected_val)
|
||||
|
||||
def test_ocsp_prefetch_open_close_py3_noerror(self):
|
||||
expected_val = b'whatever_value'
|
||||
open(self.db_path, 'a').close()
|
||||
self.call_mocked_py3(
|
||||
self.config._write_to_dbm, self.db_path,
|
||||
b'key', expected_val
|
||||
)
|
||||
db2 = self.call_mocked_py3(self.config._read_dbm, self.db_path)
|
||||
self.assertEqual(db2[b'key'], expected_val)
|
||||
|
||||
def test_ocsp_prefetch_safe_open_hash_mismatch(self):
|
||||
import random
|
||||
def mock_hash(_filepath):
|
||||
# shound not be the same value twice
|
||||
return str(random.getrandbits(1024))
|
||||
|
||||
open(self.db_path, 'a').close()
|
||||
with mock.patch("certbot_apache._internal.apache_util._file_hash", side_effect=mock_hash):
|
||||
self.assertRaises(
|
||||
errors.PluginError,
|
||||
self.call_mocked_py3,
|
||||
self.config._write_to_dbm, self.db_path,
|
||||
b'anything', b'irrelevant'
|
||||
)
|
||||
|
||||
def test_ocsp_prefetch_safe_open_hash_fail_open(self):
|
||||
open(self.db_path, 'a').close()
|
||||
with mock.patch("certbot_apache._internal.apache_util._file_hash", side_effect=IOError):
|
||||
self.assertRaises(
|
||||
errors.PluginError,
|
||||
self.call_mocked_py3,
|
||||
self.config._write_to_dbm, self.db_path,
|
||||
b'anything', b'irrelevant'
|
||||
)
|
||||
|
||||
@mock.patch("certbot_apache._internal.constants.OCSP_APACHE_TTL", 1234)
|
||||
def test_ttl(self):
|
||||
self.assertEqual(self.config._ocsp_ttl(None), 1234)
|
||||
next_update = datetime.today() + timedelta(days=6)
|
||||
ttl = self.config._ocsp_ttl(next_update)
|
||||
# ttl should be roughly 3 days
|
||||
self.assertTrue(ttl > 86400*2)
|
||||
self.assertTrue(ttl < 86400*4)
|
||||
|
||||
@mock.patch("certbot_apache._internal.prefetch_ocsp.OCSPPrefetchMixin._ocsp_prefetch_fetch_state")
|
||||
@mock.patch("certbot_apache._internal.configurator.ApacheConfigurator.config_test")
|
||||
@mock.patch("certbot_apache._internal.configurator.ApacheConfigurator._reload")
|
||||
def test_restart_load_state_call(self, _rl, _ct, mock_fch):
|
||||
self.assertFalse(self.config._ocsp_prefetch)
|
||||
self.config.restart()
|
||||
self.assertTrue(mock_fch.called)
|
||||
|
||||
@mock.patch("certbot_apache._internal.prefetch_ocsp.OCSPPrefetchMixin._ocsp_prefetch_backup_db")
|
||||
@mock.patch("certbot_apache._internal.prefetch_ocsp.OCSPPrefetchMixin._ocsp_prefetch_restore_db")
|
||||
@mock.patch("certbot_apache._internal.configurator.ApacheConfigurator.config_test")
|
||||
@mock.patch("certbot_apache._internal.configurator.ApacheConfigurator._reload")
|
||||
def test_restart_backupdb_call(self, _rl, _ctest, mock_rest, mock_bck):
|
||||
self.config._ocsp_prefetch = True
|
||||
self.config.restart()
|
||||
self.assertTrue(mock_rest.called)
|
||||
self.assertTrue(mock_bck.called)
|
||||
|
||||
@mock.patch("certbot_apache._internal.prefetch_ocsp.OCSPPrefetchMixin._ocsp_prefetch_backup_db")
|
||||
@mock.patch("certbot_apache._internal.prefetch_ocsp.OCSPPrefetchMixin._ocsp_prefetch_restore_db")
|
||||
@mock.patch("certbot_apache._internal.configurator.ApacheConfigurator.config_test")
|
||||
@mock.patch("certbot_apache._internal.configurator.ApacheConfigurator._reload")
|
||||
def test_restart_recover_error(self, mock_reload, _ctest, mock_rest, mock_bck):
|
||||
self.config._ocsp_prefetch = True
|
||||
mock_reload.side_effect = errors.MisconfigurationError
|
||||
self.assertRaises(errors.MisconfigurationError, self.config.restart)
|
||||
self.assertTrue(mock_bck.called)
|
||||
self.assertTrue(mock_rest.called)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main() # pragma: no cover
|
||||
@@ -6,7 +6,8 @@ Certbot adheres to [Semantic Versioning](https://semver.org/).
|
||||
|
||||
### Added
|
||||
|
||||
*
|
||||
* OCSP prefetching functionality for Apache plugin that attempts to refresh the OCSP
|
||||
response cache for managed certificates when scheduled Certbot renew is being run.
|
||||
|
||||
### Changed
|
||||
|
||||
|
||||
@@ -60,6 +60,7 @@ CLI_DEFAULTS = dict(
|
||||
redirect=None,
|
||||
auto_hsts=False,
|
||||
hsts=None,
|
||||
ocsp_prefetch=False,
|
||||
uir=None,
|
||||
staple=None,
|
||||
strict_permissions=False,
|
||||
|
||||
@@ -805,6 +805,11 @@ def install(config, plugins):
|
||||
"If your certificate is managed by Certbot, please use --cert-name "
|
||||
"to define which certificate you would like to install.")
|
||||
|
||||
# It's important that the old style enhancements get enabled before
|
||||
# the new style ones, as some of the new enhancements can modify the
|
||||
# same configuration directives. _install_cert() in the above block
|
||||
# handles the old style enhancements here.
|
||||
|
||||
if enhancements.are_requested(config):
|
||||
# In the case where we don't have certname, we have errored out already
|
||||
lineage = cert_manager.lineage_for_certname(config, config.certname)
|
||||
@@ -925,9 +930,14 @@ def enhance(config, plugins):
|
||||
lineage = cert_manager.lineage_for_certname(config, config.certname)
|
||||
if not config.chain_path:
|
||||
config.chain_path = lineage.chain_path
|
||||
if not config.cert_path:
|
||||
config.cert_path = lineage.cert_path
|
||||
if oldstyle_enh:
|
||||
le_client = _init_le_client(config, authenticator=None, installer=installer)
|
||||
le_client.enhance_config(domains, config.chain_path, ask_redirect=False)
|
||||
# It's important that the old style enhancements get enabled before
|
||||
# the new style ones, as some of the new enhancements can modify the
|
||||
# same configuration directives.
|
||||
if enhancements.are_requested(config):
|
||||
enhancements.enable(lineage, domains, installer, config)
|
||||
|
||||
@@ -1106,7 +1116,10 @@ def run(config, plugins):
|
||||
_report_new_cert(config, cert_path, fullchain_path, key_path)
|
||||
|
||||
_install_cert(config, le_client, domains, new_lineage)
|
||||
|
||||
# It's important that the old style enhancements get enabled before
|
||||
# the new style ones, as some of the new enhancements can modify the
|
||||
# same configuration directives. _install_cert() handles the old
|
||||
# style enhancements here.
|
||||
if enhancements.are_requested(config) and new_lineage:
|
||||
enhancements.enable(new_lineage, domains, installer, config)
|
||||
|
||||
|
||||
@@ -32,7 +32,6 @@ except (ImportError, AttributeError): # pragma: no cover
|
||||
ocsp = None # type: ignore
|
||||
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -65,32 +64,61 @@ class RevocationChecker(object):
|
||||
.. todo:: Make this a non-blocking call
|
||||
|
||||
:param `.storage.RenewableCert` cert: Certificate object
|
||||
|
||||
:returns: True if revoked; False if valid or the check failed or cert is expired.
|
||||
:rtype: bool
|
||||
|
||||
"""
|
||||
cert_path, chain_path = cert.cert, cert.chain
|
||||
return self.ocsp_revoked_by_paths(cert.cert, cert.chain)
|
||||
|
||||
if self.broken:
|
||||
return False
|
||||
def ocsp_revoked_by_paths(self, cert_path, chain_path, response_file=None):
|
||||
# type: (str, str, Optional[str]) -> bool
|
||||
"""Performs the OCSP revocation check
|
||||
|
||||
:param str cert_path: Certificate path
|
||||
:param str chain_path: Certificate chain filepath
|
||||
:param str response_file: File path to a file containing a raw OCSP response.
|
||||
|
||||
:returns: True if revoked; False if valid or the check failed or cert is expired.
|
||||
:rtype: bool
|
||||
"""
|
||||
|
||||
# Let's Encrypt doesn't update OCSP for expired certificates,
|
||||
# so don't check OCSP if the cert is expired.
|
||||
# https://github.com/certbot/certbot/issues/7152
|
||||
now = pytz.UTC.fromutc(datetime.utcnow())
|
||||
if cert.target_expiry <= now:
|
||||
if crypto_util.notAfter(cert_path) <= now:
|
||||
return False
|
||||
|
||||
if self.broken:
|
||||
return False
|
||||
|
||||
url, host = _determine_ocsp_server(cert_path)
|
||||
if not host or not url:
|
||||
return False
|
||||
if self.use_openssl_binary:
|
||||
return self._check_ocsp_openssl_bin(cert_path, chain_path, host, url, response_file)
|
||||
return _check_ocsp_cryptography(cert_path, chain_path, url, response_file)
|
||||
|
||||
def ocsp_times(self, response_file):
|
||||
# type: (str) -> Tuple[Optional[datetime], Optional[datetime], Optional[datetime]]
|
||||
"""
|
||||
Reads OCSP response file and returns producedAt, thisUpdate
|
||||
and nextUpdate values in datetime format, or None if an error
|
||||
occurs.
|
||||
|
||||
:param str response_file: File path to OCSP response
|
||||
|
||||
:returns: tuple of producedAt, thisUpdate and nextUpdate values
|
||||
:rtype: tuple of datetime or None
|
||||
"""
|
||||
|
||||
if self.use_openssl_binary:
|
||||
return self._check_ocsp_openssl_bin(cert_path, chain_path, host, url)
|
||||
return _check_ocsp_cryptography(cert_path, chain_path, url)
|
||||
return _ocsp_times_openssl_bin(response_file)
|
||||
return _ocsp_times_cryptography(response_file)
|
||||
|
||||
def _check_ocsp_openssl_bin(self, cert_path, chain_path, host, url):
|
||||
# type: (str, str, str, str) -> bool
|
||||
def _check_ocsp_openssl_bin(self, cert_path, chain_path, host, url, response_file=None):
|
||||
# type: (str, str, str, str, Optional[str]) -> bool
|
||||
# jdkasten thanks "Bulletproof SSL and TLS - Ivan Ristic" for documenting this!
|
||||
cmd = ["openssl", "ocsp",
|
||||
"-no_nonce",
|
||||
@@ -101,6 +129,8 @@ class RevocationChecker(object):
|
||||
"-verify_other", chain_path,
|
||||
"-trust_other",
|
||||
"-header"] + self.host_args(host)
|
||||
if response_file: # pragma: no cover
|
||||
cmd += ["-respout", response_file]
|
||||
logger.debug("Querying OCSP for %s", cert_path)
|
||||
logger.debug(" ".join(cmd))
|
||||
try:
|
||||
@@ -142,8 +172,58 @@ def _determine_ocsp_server(cert_path):
|
||||
return None, None
|
||||
|
||||
|
||||
def _check_ocsp_cryptography(cert_path, chain_path, url):
|
||||
# type: (str, str, str) -> bool
|
||||
def _ocsp_times_openssl_bin(response_file):
|
||||
# type: (str) -> Tuple[Optional[datetime], Optional[datetime], Optional[datetime]]
|
||||
"""
|
||||
Reads OCSP response file using OpenSSL binary and returns
|
||||
producedAt, thisUpdate and nextUpdate values in datetime format.
|
||||
|
||||
:param str response_file: File path to OCSP response
|
||||
|
||||
:returns: tuple of producedAt, thisUpdate and nextUpdate values
|
||||
:rtype: tuple of datetime or None
|
||||
"""
|
||||
cmd = ["openssl", "ocsp", "-resp_text", "-noverify", "-respin", response_file]
|
||||
logger.debug("Reading OCSP response from file: %s", response_file)
|
||||
logger.debug(" ".join(cmd))
|
||||
try:
|
||||
output, _ = util.run_script(cmd, log=logger.debug)
|
||||
except errors.SubprocessError:
|
||||
logger.info("Reading OCSP response from file failed.")
|
||||
return None, None, None
|
||||
|
||||
prod_str, this_str, next_str = _translate_ocsp_response_times(output)
|
||||
prod_dt = util.parse_datetime(prod_str)
|
||||
this_dt = util.parse_datetime(this_str)
|
||||
next_dt = util.parse_datetime(next_str)
|
||||
return prod_dt, this_dt, next_dt
|
||||
|
||||
|
||||
def _ocsp_times_cryptography(response_file):
|
||||
# type: (str) -> Tuple[Optional[datetime], Optional[datetime], Optional[datetime]]
|
||||
"""
|
||||
Reads OCSP response using cryptography and returns producedAt,
|
||||
thisUpdate and nextUpdate values in datetime format, or None
|
||||
if the file cannot be opened.
|
||||
|
||||
:param str response_file: File path to OCSP response
|
||||
|
||||
:returns: tuple of producedAt, thisUpdate and nextUpdate values
|
||||
:rtype: tuple of datetime or None
|
||||
"""
|
||||
|
||||
try:
|
||||
with open(response_file, 'rb') as fh:
|
||||
raw_response = fh.read()
|
||||
except OSError:
|
||||
return None, None, None
|
||||
|
||||
response = ocsp.load_der_ocsp_response(raw_response)
|
||||
return response.produced_at, response.this_update, response.next_update
|
||||
|
||||
|
||||
def _check_ocsp_cryptography(cert_path, chain_path, url, response_file=None):
|
||||
# type: (str, str, str, Optional[str]) -> bool
|
||||
# Retrieve OCSP response
|
||||
with open(chain_path, 'rb') as file_handler:
|
||||
issuer = x509.load_pem_x509_certificate(file_handler.read(), default_backend())
|
||||
@@ -163,6 +243,10 @@ def _check_ocsp_cryptography(cert_path, chain_path, url):
|
||||
logger.info("OCSP check failed for %s (HTTP status: %d)", cert_path, response.status_code)
|
||||
return False
|
||||
|
||||
if response_file: # pragma: no cover
|
||||
with open(response_file, 'wb') as fh:
|
||||
fh.write(response.content)
|
||||
|
||||
response_ocsp = ocsp.load_der_ocsp_response(response.content)
|
||||
|
||||
# Check OCSP response validity
|
||||
@@ -296,5 +380,40 @@ def _translate_ocsp_query(cert_path, ocsp_output, ocsp_errors):
|
||||
return True
|
||||
else:
|
||||
logger.warning("Unable to properly parse OCSP output: %s\nstderr:%s",
|
||||
ocsp_output, ocsp_errors)
|
||||
ocsp_output, ocsp_errors)
|
||||
return False
|
||||
|
||||
|
||||
def _translate_ocsp_response_times(response):
|
||||
# type: (str) -> Tuple[str, str, str]
|
||||
"""
|
||||
Parse openssl OCSP response output and return producedAt,
|
||||
thisUpdate and nextUpdate values.
|
||||
|
||||
:param str response: OpenSSL OCSP response output
|
||||
|
||||
:returns: tuple of producedAt, thisUpdate and nextUpdate values
|
||||
:rtype: tuple of str
|
||||
"""
|
||||
|
||||
prod_pattern = "Produced At: (.+)$"
|
||||
this_pattern = "This Update: (.+)$"
|
||||
next_pattern = "Next Update: (.+)$"
|
||||
|
||||
prod_date = ""
|
||||
this_date = ""
|
||||
next_date = ""
|
||||
|
||||
prod_match = re.search(prod_pattern, response, flags=re.MULTILINE)
|
||||
if prod_match:
|
||||
prod_date = prod_match.group(1)
|
||||
|
||||
this_match = re.search(this_pattern, response, flags=re.MULTILINE)
|
||||
if this_match:
|
||||
this_date = this_match.group(1)
|
||||
|
||||
next_match = re.search(next_pattern, response, flags=re.MULTILINE)
|
||||
if next_match:
|
||||
next_date = next_match.group(1)
|
||||
|
||||
return prod_date, this_date, next_date
|
||||
|
||||
@@ -12,6 +12,7 @@ import warnings
|
||||
from cryptography import x509 # type: ignore
|
||||
from cryptography.exceptions import InvalidSignature
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import hashes # type: ignore
|
||||
from cryptography.hazmat.primitives.asymmetric.ec import ECDSA
|
||||
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePublicKey
|
||||
from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15
|
||||
@@ -222,6 +223,33 @@ def verify_renewable_cert(renewable_cert):
|
||||
verify_cert_matches_priv_key(renewable_cert.cert_path, renewable_cert.key_path)
|
||||
|
||||
|
||||
def load_cert(cert_path):
|
||||
"""Reads the certificate PEM file and returns a cryptography.x509.Certificate object.
|
||||
|
||||
:param str cert_path: Path to the certificate
|
||||
|
||||
:rtype: `cryptography.x509.Certificate`
|
||||
:returns: x509 certificate object
|
||||
"""
|
||||
with open(cert_path, 'rb') as fh:
|
||||
cert_pem = fh.read()
|
||||
return x509.load_pem_x509_certificate(cert_pem, default_backend())
|
||||
|
||||
|
||||
def cert_sha1_fingerprint(cert_path):
|
||||
"""Read fingerprint of a certificate pointed by its file path
|
||||
and returns sha1 digest of said fingerprint.
|
||||
|
||||
:param str cert_path: File path to the x509 certificate file
|
||||
|
||||
:returns: SHA-1 fingerprint of the certificate
|
||||
:rtype: bytes
|
||||
"""
|
||||
|
||||
cert = load_cert(cert_path)
|
||||
return cert.fingerprint(hashes.SHA1())
|
||||
|
||||
|
||||
def verify_renewable_cert_sig(renewable_cert):
|
||||
"""Verifies the signature of a RenewableCert object.
|
||||
|
||||
@@ -294,7 +322,7 @@ def verify_cert_matches_priv_key(cert_path, key_path):
|
||||
error_str = "verifying the cert located at {0} matches the \
|
||||
private key located at {1} has failed. \
|
||||
Details: {2}".format(cert_path,
|
||||
key_path, e)
|
||||
key_path, e)
|
||||
logger.exception(error_str)
|
||||
raise errors.Error(error_str)
|
||||
|
||||
@@ -448,7 +476,7 @@ def _notAfterBefore(cert_path, method):
|
||||
# pylint: disable=redefined-outer-name
|
||||
with open(cert_path) as f:
|
||||
x509 = crypto.load_certificate(crypto.FILETYPE_PEM,
|
||||
f.read())
|
||||
f.read())
|
||||
# pyopenssl always returns bytes
|
||||
timestamp = method(x509)
|
||||
reformatted_timestamp = [timestamp[0:4], b"-", timestamp[4:6], b"-",
|
||||
@@ -478,6 +506,7 @@ def sha256sum(filename):
|
||||
sha256.update(file_d.read().encode('UTF-8'))
|
||||
return sha256.hexdigest()
|
||||
|
||||
|
||||
def cert_and_chain_from_fullchain(fullchain_pem):
|
||||
"""Split fullchain_pem into cert_pem and chain_pem
|
||||
|
||||
@@ -488,6 +517,7 @@ def cert_and_chain_from_fullchain(fullchain_pem):
|
||||
|
||||
"""
|
||||
cert = crypto.dump_certificate(crypto.FILETYPE_PEM,
|
||||
crypto.load_certificate(crypto.FILETYPE_PEM, fullchain_pem)).decode()
|
||||
crypto.load_certificate(
|
||||
crypto.FILETYPE_PEM, fullchain_pem)).decode()
|
||||
chain = fullchain_pem[len(cert):].lstrip()
|
||||
return (cert, chain)
|
||||
|
||||
@@ -156,13 +156,62 @@ class AutoHSTSEnhancement(object):
|
||||
:type domains: str
|
||||
"""
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class OCSPPrefetchEnhancement(object):
|
||||
"""
|
||||
Enhancement interface that installer plugins can implement in order to
|
||||
provide functionality that prefetches an OCSP response and stores it
|
||||
to be served for incoming client requests.
|
||||
The plugins implementing new style enhancements are responsible of handling
|
||||
the saving of configuration checkpoints as well as calling possible restarts
|
||||
of managed software themselves. For update_ocsp_prefetch method, the installer
|
||||
may have to call prepare() to finalize the plugin initialization.
|
||||
Methods:
|
||||
enable_ocsp_prefetch is called when the domain is configured to
|
||||
serve OCSP responses using mechanism called OCSP Stapling.
|
||||
|
||||
update_ocsp_prefetch is called every time when Certbot is run using 'renew'
|
||||
verb. Certbot should proceed to make a request to the OCSP server in order
|
||||
to fetch an OCSP response and to store the recieved response, if valid.
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def update_ocsp_prefetch(self, lineage, *args, **kwargs):
|
||||
"""
|
||||
Gets called for each lineage every time Certbot is run with 'renew' verb.
|
||||
Implementation of this method should fetch a fresh OCSP response if it's
|
||||
needed and if valid, store it to be served for connecting clients.
|
||||
|
||||
:param lineage: Certificate lineage object
|
||||
:type lineage: certbot.storage.RenewableCert
|
||||
|
||||
.. note:: prepare() method inherited from `interfaces.IPlugin` might need
|
||||
to be called manually within implementation of this interface method
|
||||
to finalize the plugin initialization.
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def enable_ocsp_prefetch(self, lineage, domains, *args, **kwargs):
|
||||
"""
|
||||
Enables the OCSP enhancement, enabling OCSP Stapling functionality for
|
||||
the controlled software, and sets it up for prefetching the responses
|
||||
over the subsequent runs of Certbot renew.
|
||||
|
||||
:param lineage: Certificate lineage object
|
||||
:type lineage: certbot.storage.RenewableCert
|
||||
:param domains: List of domains in certificate to enhance
|
||||
:type domains: `list` of `str`
|
||||
"""
|
||||
|
||||
|
||||
# This is used to configure internal new style enhancements in Certbot. These
|
||||
# enhancement interfaces need to be defined in this file. Please do not modify
|
||||
# this list from plugin code.
|
||||
_INDEX = [
|
||||
{
|
||||
"name": "AutoHSTS",
|
||||
"cli_help": "Gradually increasing max-age value for HTTP Strict Transport "+
|
||||
"cli_help": "Gradually increasing max-age value for HTTP Strict Transport " +
|
||||
"Security security header",
|
||||
"cli_flag": "--auto-hsts",
|
||||
"cli_flag_default": constants.CLI_DEFAULTS["auto_hsts"],
|
||||
@@ -173,5 +222,19 @@ _INDEX = [
|
||||
"updater_function": "update_autohsts",
|
||||
"deployer_function": "deploy_autohsts",
|
||||
"enable_function": "enable_autohsts"
|
||||
},
|
||||
{
|
||||
"name": "OCSPPrefetch",
|
||||
"cli_help": "Prefetch OCSP responses for certificates in order to be " +
|
||||
"able to serve connecting clients fresh staple immediately",
|
||||
"cli_flag": "--prefetch-ocsp",
|
||||
"cli_flag_default": constants.CLI_DEFAULTS["ocsp_prefetch"],
|
||||
"cli_groups": ["security", "enhance"],
|
||||
"cli_dest": "ocsp_prefetch",
|
||||
"cli_action": "store_true",
|
||||
"class": OCSPPrefetchEnhancement,
|
||||
"updater_function": "update_ocsp_prefetch",
|
||||
"deployer_function": None,
|
||||
"enable_function": "enable_ocsp_prefetch"
|
||||
}
|
||||
] # type: List[Dict[str, Any]]
|
||||
|
||||
@@ -5,6 +5,7 @@ import argparse
|
||||
import atexit
|
||||
import collections
|
||||
from collections import OrderedDict
|
||||
from datetime import datetime
|
||||
import distutils.version # pylint: disable=import-error,no-name-in-module
|
||||
import errno
|
||||
import logging
|
||||
@@ -598,3 +599,19 @@ def atexit_register(func, *args, **kwargs):
|
||||
def _atexit_call(func, *args, **kwargs):
|
||||
if _INITIAL_PID == os.getpid():
|
||||
func(*args, **kwargs)
|
||||
|
||||
|
||||
def parse_datetime(dt_string):
|
||||
"""
|
||||
Parses a string to datetime, ignoring timezone.
|
||||
|
||||
:param str dt_string: String representation of date and time
|
||||
|
||||
:returns: datetime representation of time
|
||||
:rtype: datetime.datetime or None
|
||||
"""
|
||||
try:
|
||||
dateformat = "%b %d %H:%M:%S %Y %Z"
|
||||
return datetime.strptime(dt_string, dateformat)
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
@@ -959,6 +959,30 @@ changed by passing the desired number to the command line flag
|
||||
want to alter the log rotation, check `/etc/logrotate.d/` for a
|
||||
certbot rotation script.
|
||||
|
||||
.. _prefetch-ocsp:
|
||||
|
||||
Prefetching OCSP responses
|
||||
==========================
|
||||
|
||||
Certbot users on Debian and Ubuntu based operating systems have the option to
|
||||
configure certbot to handle prefetching and management of OCSP staples in behalf
|
||||
of Apache process. This mitigates multiple issues that exist with Apache OCSP
|
||||
staple handling in cases where there are issues with either network connectivity
|
||||
or OCSP service availability.
|
||||
|
||||
Normally when configuring Apache to handle OCSP stapling, it proceeds to fetch
|
||||
the initial response from the OCSP server only during the handshake of next
|
||||
incoming request after the restart. Upon requesting a new OCSP response from the
|
||||
OCSP server pointed by the certificate, Apache overwrites the already existing
|
||||
cached response regardless of the validity of the received response.
|
||||
|
||||
Certbot tries to fix these issues by configuring the internal expiry of the
|
||||
Apache OCSP staple cache close to the expiry of the actual OCSP staple as well
|
||||
as by backing up and restoring the existing OCSP staple cache file when restarting
|
||||
Apache process.
|
||||
|
||||
The OCSP prefetching can be enabled with command line flag `--prefetch-ocsp`.
|
||||
|
||||
.. _command-line:
|
||||
|
||||
Certbot command-line options
|
||||
|
||||
@@ -25,6 +25,7 @@ P256_KEY = test_util.load_vector('nistp256_key.pem')
|
||||
P256_CERT_PATH = test_util.vector_path('cert-nosans_nistp256.pem')
|
||||
P256_CERT = test_util.load_vector('cert-nosans_nistp256.pem')
|
||||
|
||||
|
||||
class InitSaveKeyTest(test_util.TempDirTestCase):
|
||||
"""Tests for certbot.crypto_util.init_save_key."""
|
||||
def setUp(self):
|
||||
@@ -369,8 +370,10 @@ class Sha256sumTest(unittest.TestCase):
|
||||
"""Tests for certbot.crypto_util.notAfter"""
|
||||
def test_sha256sum(self):
|
||||
from certbot.crypto_util import sha256sum
|
||||
self.assertEqual(sha256sum(CERT_PATH),
|
||||
'914ffed8daf9e2c99d90ac95c77d54f32cbd556672facac380f0c063498df84e')
|
||||
self.assertEqual(
|
||||
sha256sum(CERT_PATH),
|
||||
'914ffed8daf9e2c99d90ac95c77d54f32cbd556672facac380f0c063498df84e'
|
||||
)
|
||||
|
||||
|
||||
class CertAndChainFromFullchainTest(unittest.TestCase):
|
||||
@@ -388,5 +391,16 @@ class CertAndChainFromFullchainTest(unittest.TestCase):
|
||||
self.assertEqual(chain_out, chain_pem)
|
||||
|
||||
|
||||
class CertFingerprintTest(unittest.TestCase):
|
||||
"""Tests for certbot.crypto_util.cert_sha1_fingerprint"""
|
||||
|
||||
def test_cert_sha1_fingerprint(self):
|
||||
from certbot.crypto_util import cert_sha1_fingerprint
|
||||
self.assertEqual(
|
||||
cert_sha1_fingerprint(CERT_PATH),
|
||||
b'\t\xf8\xce\x01E\r(\x84g\xc32j\xc0E~5\x199\xc7.'
|
||||
)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main() # pragma: no cover
|
||||
|
||||
@@ -75,13 +75,14 @@ class OCSPTestOpenSSL(unittest.TestCase):
|
||||
self.assertEqual(checker.broken, True)
|
||||
|
||||
@mock.patch('certbot._internal.ocsp._determine_ocsp_server')
|
||||
@mock.patch('certbot._internal.ocsp.crypto_util.notAfter')
|
||||
@mock.patch('certbot.util.run_script')
|
||||
def test_ocsp_revoked(self, mock_run, mock_determine):
|
||||
def test_ocsp_revoked(self, mock_run, mock_na, mock_determine):
|
||||
now = pytz.UTC.fromutc(datetime.utcnow())
|
||||
cert_obj = mock.MagicMock()
|
||||
cert_obj.cert = "x"
|
||||
cert_obj.chain = "y"
|
||||
cert_obj.target_expiry = now + timedelta(hours=2)
|
||||
mock_na.return_value = now + timedelta(hours=2)
|
||||
|
||||
self.checker.broken = True
|
||||
mock_determine.return_value = ("", "")
|
||||
@@ -99,7 +100,7 @@ class OCSPTestOpenSSL(unittest.TestCase):
|
||||
self.assertEqual(mock_run.call_count, 2)
|
||||
|
||||
# cert expired
|
||||
cert_obj.target_expiry = now
|
||||
mock_na.return_value = now
|
||||
mock_determine.return_value = ("", "")
|
||||
count_before = mock_determine.call_count
|
||||
self.assertEqual(self.checker.ocsp_revoked(cert_obj), False)
|
||||
@@ -136,6 +137,40 @@ class OCSPTestOpenSSL(unittest.TestCase):
|
||||
self.assertEqual(ocsp._translate_ocsp_query(*openssl_expired_ocsp_revoked), True)
|
||||
self.assertEqual(mock_log.info.call_count, 1)
|
||||
|
||||
@mock.patch('certbot.util.run_script')
|
||||
def test_ocsp_response_get_times(self, mock_run):
|
||||
mock_run.return_value = ocsp_times_example
|
||||
producedAt, thisUpdate, nextUpdate = self.checker.ocsp_times("mocked")
|
||||
self.assertEqual(producedAt, datetime(2020, 1, 24, 11, 10))
|
||||
self.assertEqual(thisUpdate, datetime(2020, 1, 24, 11, 0))
|
||||
self.assertEqual(nextUpdate, datetime(2020, 1, 31, 11, 0))
|
||||
|
||||
@mock.patch('certbot.util.run_script')
|
||||
def test_ocsp_response_get_times_no_nextupdate(self, mock_run):
|
||||
mock_run.return_value = ocsp_times_example_nonext
|
||||
producedAt, thisUpdate, nextUpdate = self.checker.ocsp_times("mocked")
|
||||
self.assertEqual(producedAt, datetime(2020, 1, 24, 11, 10))
|
||||
self.assertEqual(thisUpdate, datetime(2020, 1, 24, 11, 0))
|
||||
self.assertEqual(nextUpdate, None)
|
||||
|
||||
@mock.patch('certbot.util.run_script')
|
||||
def test_ocsp_response_get_times_badoutput(self, mock_run):
|
||||
mock_run.return_value = ("Something unparsable", "")
|
||||
producedAt, thisUpdate, nextUpdate = self.checker.ocsp_times("mocked")
|
||||
self.assertEqual(producedAt, None)
|
||||
self.assertEqual(thisUpdate, None)
|
||||
self.assertEqual(nextUpdate, None)
|
||||
|
||||
@mock.patch('certbot._internal.ocsp.logger')
|
||||
@mock.patch('certbot.util.run_script')
|
||||
def test_ocsp_response_get_times_error(self, mock_run, mock_log):
|
||||
mock_run.side_effect = errors.SubprocessError
|
||||
producedAt, thisUpdate, nextUpdate = self.checker.ocsp_times("mocked")
|
||||
self.assertEqual(producedAt, None)
|
||||
self.assertEqual(thisUpdate, None)
|
||||
self.assertEqual(nextUpdate, None)
|
||||
self.assertEqual(mock_log.info.call_count, 1)
|
||||
|
||||
|
||||
@unittest.skipIf(not ocsp_lib,
|
||||
reason='This class tests functionalities available only on cryptography>=2.5.0')
|
||||
@@ -152,20 +187,26 @@ class OSCPTestCryptography(unittest.TestCase):
|
||||
self.cert_obj = mock.MagicMock()
|
||||
self.cert_obj.cert = self.cert_path
|
||||
self.cert_obj.chain = self.chain_path
|
||||
|
||||
def _call_expirymock(self, func, *args, **kwargs):
|
||||
"""Call function with mocked certificate expiry time"""
|
||||
now = pytz.UTC.fromutc(datetime.utcnow())
|
||||
self.cert_obj.target_expiry = now + timedelta(hours=2)
|
||||
with mock.patch('certbot._internal.ocsp.crypto_util.notAfter') as mock_na:
|
||||
mock_na.return_value = now + timedelta(hours=2)
|
||||
return func(*args, **kwargs)
|
||||
|
||||
@mock.patch('certbot._internal.ocsp._determine_ocsp_server')
|
||||
@mock.patch('certbot._internal.ocsp._check_ocsp_cryptography')
|
||||
def test_ensure_cryptography_toggled(self, mock_revoke, mock_determine):
|
||||
mock_determine.return_value = ('http://example.com', 'example.com')
|
||||
self.checker.ocsp_revoked(self.cert_obj)
|
||||
self._call_expirymock(self.checker.ocsp_revoked, self.cert_obj)
|
||||
|
||||
mock_revoke.assert_called_once_with(self.cert_path, self.chain_path, 'http://example.com')
|
||||
mock_revoke.assert_called_once_with(self.cert_path, self.chain_path,
|
||||
'http://example.com', None)
|
||||
|
||||
def test_revoke(self):
|
||||
with _ocsp_mock(ocsp_lib.OCSPCertStatus.REVOKED, ocsp_lib.OCSPResponseStatus.SUCCESSFUL):
|
||||
revoked = self.checker.ocsp_revoked(self.cert_obj)
|
||||
revoked = self._call_expirymock(self.checker.ocsp_revoked, self.cert_obj)
|
||||
self.assertTrue(revoked)
|
||||
|
||||
def test_responder_is_issuer(self):
|
||||
@@ -175,7 +216,7 @@ class OSCPTestCryptography(unittest.TestCase):
|
||||
with _ocsp_mock(ocsp_lib.OCSPCertStatus.REVOKED,
|
||||
ocsp_lib.OCSPResponseStatus.SUCCESSFUL) as mocks:
|
||||
mocks['mock_response'].return_value.responder_name = issuer.subject
|
||||
self.checker.ocsp_revoked(self.cert_obj)
|
||||
self._call_expirymock(self.checker.ocsp_revoked, self.cert_obj)
|
||||
# Here responder and issuer are the same. So only the signature of the OCSP
|
||||
# response is checked (using the issuer/responder public key).
|
||||
self.assertEqual(mocks['mock_check'].call_count, 1)
|
||||
@@ -190,7 +231,7 @@ class OSCPTestCryptography(unittest.TestCase):
|
||||
|
||||
with _ocsp_mock(ocsp_lib.OCSPCertStatus.REVOKED,
|
||||
ocsp_lib.OCSPResponseStatus.SUCCESSFUL) as mocks:
|
||||
self.checker.ocsp_revoked(self.cert_obj)
|
||||
self._call_expirymock(self.checker.ocsp_revoked, self.cert_obj)
|
||||
# Here responder and issuer are not the same. Two signatures will be checked then,
|
||||
# first to verify the responder cert (using the issuer public key), second to
|
||||
# to verify the OCSP response itself (using the responder public key).
|
||||
@@ -271,6 +312,40 @@ class OSCPTestCryptography(unittest.TestCase):
|
||||
revoked = self.checker.ocsp_revoked(self.cert_obj)
|
||||
self.assertFalse(revoked)
|
||||
|
||||
def test_ocsp_times_cryptography(self):
|
||||
with mock.patch('certbot._internal.ocsp.open', mock.mock_open(read_data="")):
|
||||
with mock.patch('cryptography.x509.ocsp.load_der_ocsp_response') as mock_load:
|
||||
resp = mock.MagicMock()
|
||||
resp.produced_at = datetime(2020, 1, 2, 9, 9)
|
||||
resp.this_update = datetime(2020, 1, 2, 8, 8)
|
||||
resp.next_update = datetime(2020, 1, 3, 4, 4)
|
||||
mock_load.return_value = resp
|
||||
|
||||
produced_at, this_update, next_update = self.checker.ocsp_times("mocked")
|
||||
self.assertEqual(produced_at, datetime(2020, 1, 2, 9, 9))
|
||||
self.assertEqual(this_update, datetime(2020, 1, 2, 8, 8))
|
||||
self.assertEqual(next_update, datetime(2020, 1, 3, 4, 4))
|
||||
|
||||
def test_ocsp_times_cryptography_no_nextupdate(self):
|
||||
with mock.patch('certbot._internal.ocsp.open', mock.mock_open(read_data="")):
|
||||
with mock.patch('cryptography.x509.ocsp.load_der_ocsp_response') as mock_load:
|
||||
resp = mock.MagicMock()
|
||||
resp.produced_at = datetime(2020, 1, 2, 9, 9)
|
||||
resp.this_update = datetime(2020, 1, 2, 8, 8)
|
||||
resp.next_update = None
|
||||
mock_load.return_value = resp
|
||||
|
||||
produced_at, this_update, next_update = self.checker.ocsp_times("mocked")
|
||||
self.assertEqual(produced_at, datetime(2020, 1, 2, 9, 9))
|
||||
self.assertEqual(this_update, datetime(2020, 1, 2, 8, 8))
|
||||
self.assertEqual(next_update, None)
|
||||
|
||||
def test_ocsp_times_cryptography_error(self):
|
||||
with mock.patch('certbot._internal.ocsp.open', mock.mock_open(read_data="")) as mock_open:
|
||||
mock_open.side_effect = OSError
|
||||
produced_at, this_update, next_update = self.checker.ocsp_times("mocked")
|
||||
self.assertEqual([produced_at, this_update, next_update], [None, None, None])
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _ocsp_mock(certificate_status, response_status,
|
||||
@@ -370,6 +445,17 @@ revoked
|
||||
""",
|
||||
"""Response verify OK""")
|
||||
|
||||
ocsp_times_example = ("""
|
||||
Produced At: Jan 24 11:10:00 2020 GMT
|
||||
This Update: Jan 24 11:00:00 2020 GMT
|
||||
Next Update: Jan 31 11:00:00 2020 GMT
|
||||
""", "")
|
||||
|
||||
ocsp_times_example_nonext = ("""
|
||||
Produced At: Jan 24 11:10:00 2020 GMT
|
||||
This Update: Jan 24 11:00:00 2020 GMT
|
||||
""", "")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main() # pragma: no cover
|
||||
|
||||
Reference in New Issue
Block a user