Compare commits

...

13 Commits

Author SHA1 Message Date
Joona Hoikkala
c63607b7f9 Conditional DBM handling 2018-08-24 16:36:32 +03:00
Joona Hoikkala
85187ea90d Try to force dbm module 2018-08-23 14:07:59 +03:00
Joona Hoikkala
506eac2189 Hack around default dbm engine selection 2018-08-23 00:40:53 +03:00
Joona Hoikkala
899bb15b61 Debug 2018-08-22 23:00:13 +03:00
Joona Hoikkala
35c0d79390 Ensure the correct dbm module is used for all envs 2018-08-22 12:04:37 +03:00
Joona Hoikkala
ba9de53768 Revert "Figure out the DBM implementation dynamically"
This reverts commit e4834da2c1.
2018-08-22 11:32:24 +03:00
Joona Hoikkala
e4834da2c1 Figure out the DBM implementation dynamically 2018-08-22 10:32:36 +03:00
Joona Hoikkala
34899c1c14 Py3 bytes 2018-08-21 19:56:31 +03:00
Joona Hoikkala
ff5a8a83b4 Test fixes 2018-08-21 18:50:54 +03:00
Joona Hoikkala
9366633ae6 Make cert pem file read work in Py3 2018-08-21 18:16:39 +03:00
Joona Hoikkala
cec96a523c Merge remote-tracking branch 'origin/master' into ocsp_apache 2018-08-21 16:31:56 +03:00
Joona Hoikkala
c20c722acf Finalize tests 2018-08-21 16:30:41 +03:00
Joona Hoikkala
118bf1d930 OCSP prefetching functionality 2018-08-21 00:38:40 +03:00
11 changed files with 653 additions and 45 deletions

View File

@@ -1,7 +1,11 @@
""" Utility functions for certbot-apache plugin """
import binascii
import os
import six
import struct
import time
from certbot import crypto_util
from certbot import util
def get_mod_deps(mod_name):
@@ -104,3 +108,47 @@ def parse_define_file(filepath, varname):
def unique_id():
""" Returns an unique id to be used as a VirtualHost identifier"""
return binascii.hexlify(os.urandom(16)).decode("utf-8")
def get_apache_ocsp_struct(ttl, ocsp_response):
"""Create Apache OCSP response structure to be used in response cache
:param int ttl: Time-To-Live in seocnds
:param str ocsp_response: OCSP response data
:returns: Apache OCSP structure
:rtype: `str`
"""
ttl = time.time() + ttl
# As microseconds
ttl_struct = struct.pack('l', int(ttl*1000000))
return b'\x01'.join([ttl_struct, ocsp_response])
def certid_sha1_hex(cert_path):
"""Hex representation of certificate SHA1 fingerprint
:param str cert_path: File path to certificate
:returns: Hex representation SHA1 fingerprint of certificate
:rtype: `str`
"""
sha1_hex = binascii.hexlify(certid_sha1(cert_path))
if isinstance(sha1_hex, six.binary_type):
return sha1_hex.decode('utf-8') # pragma: no cover
return sha1_hex # pragma: no cover
def certid_sha1(cert_path):
"""SHA1 fingerprint of certificate
:param str cert_path: File path to certificate
:returns: SHA1 fingerprint bytestring
:rtype: `str`
"""
return crypto_util.cert_sha1_fingerprint(cert_path)

View File

@@ -1,11 +1,13 @@
"""Apache Configuration based off of Augeas Configurator."""
# pylint: disable=too-many-lines
import dbm
import copy
import fnmatch
import logging
import os
import pkg_resources
import re
import shutil
import six
import socket
import time
@@ -18,12 +20,13 @@ from acme.magic_typing import Any, DefaultDict, Dict, List, Set, Union # pylint
from certbot import errors
from certbot import interfaces
from certbot import ocsp
from certbot import util
from certbot.achallenges import KeyAuthorizationAnnotatedChallenge # pylint: disable=unused-import
from certbot.plugins import common
from certbot.plugins.util import path_surgery
from certbot.plugins.enhancements import AutoHSTSEnhancement
from certbot.plugins.enhancements import AutoHSTSEnhancement, OCSPPrefetchEnhancement
from certbot_apache import apache_util
from certbot_apache import augeas_configurator
@@ -36,6 +39,12 @@ from certbot_apache import tls_sni_01
from collections import defaultdict
try:
import dbm.ndbm as dbm # pragma: no cover
except ImportError: # pragma: no cover
# dbm.ndbm only available on Python3
import dbm # pragma: no cover
logger = logging.getLogger(__name__)
@@ -188,6 +197,8 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
self._enhanced_vhosts = defaultdict(set) # type: DefaultDict[str, Set[obj.VirtualHost]]
# Temporary state for AutoHSTS enhancement
self._autohsts = {} # type: Dict[str, Dict[str, Union[int, float]]]
self._ocsp_prefetch = {} # type: Dict[str, str]
self._ocsp_dbm_bsddb = False
# These will be set in the prepare function
self._prepared = False
@@ -1692,7 +1703,7 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
self.parser.find_dir("SSLCertificateKeyFile",
lineage.key_path, vhost.path))
def _enable_ocsp_stapling(self, ssl_vhost, unused_options):
def _enable_ocsp_stapling(self, ssl_vhost, unused_options, prefetch=False):
"""Enables OCSP Stapling
In OCSP, each client (e.g. browser) would have to query the
@@ -1713,8 +1724,8 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
:param unused_options: Not currently used
:type unused_options: Not Available
:returns: Success, general_vhost (HTTP vhost)
:rtype: (bool, :class:`~certbot_apache.obj.VirtualHost`)
:param prefetch: Use OCSP prefetching
:type prefetch: bool
"""
min_apache_ver = (2, 3, 3)
@@ -1723,8 +1734,15 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
"Unable to set OCSP directives.\n"
"Apache version is below 2.3.3.")
if "socache_shmcb_module" not in self.parser.modules:
self.enable_mod("socache_shmcb")
if prefetch:
if "socache_dbm_module" not in self.parser.modules:
self.enable_mod("socache_dbm")
cache_path = os.path.join(self.config.config_dir, "ocsp", "ocsp_cache.db")
cache_dir = ["dbm:"+cache_path]
else:
if "socache_shmcb_module" not in self.parser.modules:
self.enable_mod("socache_shmcb")
cache_dir = ["shmcb:/var/run/apache2/stapling_cache(128000)"]
# Check if there's an existing SSLUseStapling directive on.
use_stapling_aug_path = self.parser.find_dir("SSLUseStapling",
@@ -1745,8 +1763,7 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
re.sub(r"/\w*$", "", stapling_cache_aug_path[0]))
self.parser.add_dir_to_ifmodssl(ssl_vhost_aug_path,
"SSLStaplingCache",
["shmcb:/var/run/apache2/stapling_cache(128000)"])
"SSLStaplingCache", cache_dir)
msg = "OCSP Stapling was enabled on SSL Vhost: %s.\n"%(
ssl_vhost.filep)
@@ -2171,7 +2188,17 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
"""
self.config_test()
if not self._ocsp_prefetch:
# Try to populate OCSP prefetch structure from pluginstorage
self._ocsp_prefetch_fetch_state()
if self._ocsp_prefetch:
# OCSP prefetching is enabled, so back up the db
self._ocsp_prefetch_backup_db()
self._reload()
if self._ocsp_prefetch:
# Restore the backed up dbm database
self._ocsp_prefetch_restore_db()
def _reload(self):
"""Reloads the Apache server.
@@ -2494,5 +2521,201 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
# Update AutoHSTS storage (We potentially removed vhosts from managed)
self._autohsts_save_state()
def _ensure_ocsp_dirs(self):
"""Makes sure that the OCSP directory paths exist."""
ocsp_work = os.path.join(self.config.work_dir, "ocsp")
ocsp_save = os.path.join(self.config.config_dir, "ocsp")
for path in [ocsp_work, ocsp_save]:
if not os.path.isdir(path):
os.makedirs(path)
os.chmod(path, 0o755)
def _ocsp_refresh_if_needed(self, pf_obj):
"""Refreshes OCSP response for a certiifcate if it's due
:param dict pf_obj: OCSP prefetch object from pluginstorage
:returns: If OCSP response was updated
:rtype: bool
"""
ttl = pf_obj["lastupdate"] + constants.OCSP_INTERNAL_TTL
if ttl < time.time():
self._ocsp_refresh(pf_obj["cert_path"], pf_obj["chain_path"])
return True
return False
def _ocsp_refresh(self, cert_path, chain_path):
"""Refresh the OCSP response for a certificate
:param str cert_path: Filesystem path to certificate file
:param str chain_path: Filesystem path to certificate chain file
"""
self._ensure_ocsp_dirs()
handler = ocsp.OCSPResponseHandler(cert_path, chain_path)
ocsp_workfile = os.path.join(
self.config.work_dir, "ocsp",
apache_util.certid_sha1_hex(cert_path))
if handler.ocsp_request_to_file(ocsp_workfile):
# Guaranteed good response
cache_path = os.path.join(self.config.config_dir, "ocsp", "ocsp_cache")
db = self._ocsp_dbm_open(cache_path)
cert_sha = apache_util.certid_sha1(cert_path)
db[cert_sha] = self._ocsp_response_dbm(ocsp_workfile)
self._ocsp_dbm_close(db)
else:
logger.warning("Encountered an issue while trying to prefetch OCSP "
"response for certificate: %s", cert_path)
def _ensure_ocsp_prefetch_compatibility(self):
"""Make sure that the operating system supports the required libraries
to manage Apache DBM files.
:raises: errors.NotSupportedError
"""
try:
import bsddb
except ImportError:
import dbm
if not hasattr(dbm, 'ndbm'):
msg = ("Unfortunately your operating system does not have a "
"compatible database module available for managing "
"Apache cache database.")
raise errors.NotSupportedError(msg)
def _ocsp_dbm_open(self, filepath):
"""Helper method to open an DBM file in a way that depends on the platform
that Certbot is run on. Returns an open database structure."""
try:
import bsddb
self._ocsp_dbm_bsddb = True
cache_path = filepath + ".db"
database = bsddb.hashopen(cache_path, 'c')
except ImportError:
# Python3 doesn't have bsddb module, so we use dbm.ndbm instead
import dbm
database = dbm.ndbm.open(filepath, 'c')
return database
def _ocsp_dbm_close(self, database):
"""Helper method to sync and close a DBM file, in a way required by the
used dbm implementation."""
if self._ocsp_dbm_bsddb:
database.sync()
database.close()
else:
database.close()
def _ocsp_response_dbm(self, workfile):
"""Creates a dbm entry for OCSP response data
:param str workfile: File path for raw OCSP response
:returns: OCSP response cache data that Apache can use
:rtype: string
"""
with open(workfile, 'rb') as fh:
response = fh.read()
ttl = constants.OCSP_APACHE_TTL
return apache_util.get_apache_ocsp_struct(ttl, response)
def _ocsp_prefetch_save(self, cert_path, chain_path):
"""Saves status of current OCSP prefetch, including the last update
time to determine if an update is needed on later run.
:param str cert_path: Filesystem path to certificate
:param str chain_path: Filesystem path to certificate chain file
"""
status = {
"lastupdate": time.time(),
"cert_path": cert_path,
"chain_path": chain_path
}
cert_id = apache_util.certid_sha1_hex(cert_path)
self._ocsp_prefetch[cert_id] = status
self.storage.put("ocsp_prefetch", self._ocsp_prefetch)
self.storage.save()
def _ocsp_prefetch_fetch_state(self):
"""
Populates the OCSP prefetch state from the pluginstorage.
"""
try:
self._ocsp_prefetch = self.storage.fetch("ocsp_prefetch")
except KeyError:
self._ocsp_prefetch = dict()
def _ocsp_prefetch_backup_db(self):
"""
Copies the active dbm file to work directory.
"""
self._ensure_ocsp_dirs()
cache_path = os.path.join(self.config.config_dir, "ocsp", "ocsp_cache.db")
try:
shutil.copy2(cache_path, os.path.join(self.config.work_dir, "ocsp"))
except IOError:
logger.debug("Encountered an issue while trying to backup OCSP dbm file")
def _ocsp_prefetch_restore_db(self):
"""
Restores the active dbm file from work directory.
"""
self._ensure_ocsp_dirs()
cache_path = os.path.join(self.config.config_dir, "ocsp", "ocsp_cache.db")
work_file_path = os.path.join(self.config.work_dir, "ocsp", "ocsp_cache.db")
try:
shutil.copy2(work_file_path, cache_path)
except IOError:
logger.debug("Encountered an issue when trying to restore OCSP dbm file")
def enable_ocsp_prefetch(self, lineage, domains):
"""Enable OCSP Stapling and prefetching of the responses.
In OCSP, each client (e.g. browser) would have to query the
OCSP Responder to validate that the site certificate was not revoked.
Enabling OCSP Stapling, would allow the web-server to query the OCSP
Responder, and staple its response to the offered certificate during
TLS. i.e. clients would not have to query the OCSP responder.
"""
# Fail early if we are not able to support this
self._ensure_ocsp_prefetch_compatibility()
prefetch_vhosts = set()
for domain in domains:
matched_vhosts = self.choose_vhosts(domain, create_if_no_ssl=False)
# We should be handling only SSL vhosts
for vh in matched_vhosts:
if vh.ssl:
prefetch_vhosts.add(vh)
if prefetch_vhosts:
for vh in prefetch_vhosts:
self._enable_ocsp_stapling(vh, None, prefetch=True)
self._ocsp_prefetch_save(lineage.cert_path, lineage.chain_path)
self.restart()
logger.warning(apache_util.certid_sha1_hex(lineage.cert_path))
self._ocsp_refresh(lineage.cert_path, lineage.chain_path)
def update_ocsp_prefetch(self, _unused_lineage):
"""Checks all certificates that are managed by OCSP prefetch, and
refreshes OCSP responses for them if required."""
self._ocsp_prefetch_fetch_state()
if not self._ocsp_prefetch:
# No OCSP prefetching enabled for any certificate
return
for _, pf in self._ocsp_prefetch.items():
if self._ocsp_refresh_if_needed(pf):
# Save the status to pluginstorage
self._ocsp_prefetch_save(pf["cert_path"], pf["chain_path"])
AutoHSTSEnhancement.register(ApacheConfigurator) # pylint: disable=no-member
OCSPPrefetchEnhancement.register(ApacheConfigurator) # pylint: disable=no-member

View File

@@ -61,3 +61,9 @@ AUTOHSTS_FREQ = 172800
MANAGED_COMMENT = "DO NOT REMOVE - Managed by Certbot"
MANAGED_COMMENT_ID = MANAGED_COMMENT+", VirtualHost id: {0}"
"""Managed by Certbot comments and the VirtualHost identification template"""
OCSP_APACHE_TTL = 432000
"""Apache TTL for OCSP response: 5 days"""
OCSP_INTERNAL_TTL = 86400
"""Internal TTL for OCSP response: 1 day"""

View File

@@ -0,0 +1,167 @@
"""Test for certbot_apache.configurator OCSP Prefetching functionality"""
import os
import unittest
import mock
# six is used in mock.patch()
import six # pylint: disable=unused-import
from certbot_apache.tests import util
class OCSPPrefetchTest(util.ApacheTest):
"""Tests for OCSP Prefetch feature"""
# pylint: disable=protected-access
def setUp(self): # pylint: disable=arguments-differ
super(OCSPPrefetchTest, self).setUp()
self.config = util.get_apache_configurator(
self.config_path, self.vhost_path, self.config_dir, self.work_dir)
self.lineage = mock.MagicMock(cert_path="cert", chain_path="chain")
self.config.parser.modules.add("headers_module")
self.config.parser.modules.add("mod_headers.c")
self.config.parser.modules.add("ssl_module")
self.config.parser.modules.add("mod_ssl.c")
self.config.parser.modules.add("socache_dbm_module")
self.config.parser.modules.add("mod_socache_dbm.c")
self.vh_truth = util.get_vh_truth(
self.temp_dir, "debian_apache_2_4/multiple_vhosts")
def call_mocked(self, func, *args, **kwargs):
"""Helper method to call functins with mock stack"""
ver_path = "certbot_apache.configurator.ApacheConfigurator.get_version"
cry_path = "certbot.crypto_util.cert_sha1_fingerprint"
with mock.patch(ver_path) as mock_ver:
mock_ver.return_value = (2, 4, 10)
with mock.patch(cry_path) as mock_cry:
mock_cry.return_value = b'j\x056\x1f\xfa\x08B\xe8D\xa1Bn\xeb*A\xebWx\xdd\xfe'
return func(*args, **kwargs)
@mock.patch("certbot_apache.configurator.ApacheConfigurator.restart")
@mock.patch("certbot_apache.configurator.ApacheConfigurator.enable_mod")
def test_ocsp_prefetch_enable_mods(self, mock_enable, _restart):
self.config.parser.modules.discard("socache_dbm_module")
self.config.parser.modules.discard("mod_socache_dbm.c")
self.config.parser.modules.discard("headers_module")
self.config.parser.modules.discard("mod_header.c")
ref_path = "certbot_apache.configurator.ApacheConfigurator._ocsp_refresh"
with mock.patch(ref_path):
self.call_mocked(self.config.enable_ocsp_prefetch,
self.lineage,
["ocspvhost.com"])
self.assertTrue(mock_enable.called)
self.assertEquals(len(self.config._ocsp_prefetch), 1)
@mock.patch("certbot_apache.constants.OCSP_INTERNAL_TTL", 0)
@mock.patch("certbot_apache.configurator.ApacheConfigurator.restart")
def test_ocsp_prefetch_refresh(self, _mock_restart):
db_path = os.path.join(self.config_dir, "ocsp", "ocsp_cache")
def ocsp_req_mock(workfile):
"""Method to mock the OCSP request and write response to file"""
with open(workfile, 'w') as fh:
fh.write("MOCKRESPONSE")
return True
ocsp_path = "certbot.ocsp.OCSPResponseHandler.ocsp_request_to_file"
with mock.patch(ocsp_path, side_effect=ocsp_req_mock):
self.call_mocked(self.config.enable_ocsp_prefetch,
self.lineage,
["ocspvhost.com"])
odbm = self.config._ocsp_dbm_open(db_path)
self.assertEquals(len(odbm.keys()), 1)
# The actual response data is prepended by Apache timestamp
self.assertTrue(odbm[odbm.keys()[0]].endswith(b'MOCKRESPONSE'))
self.config._ocsp_dbm_close(odbm)
with mock.patch(ocsp_path, side_effect=ocsp_req_mock) as mock_ocsp:
self.call_mocked(self.config.update_ocsp_prefetch, None)
self.assertTrue(mock_ocsp.called)
@mock.patch("certbot_apache.configurator.ApacheConfigurator.restart")
def test_ocsp_prefetch_refresh_noop(self, _mock_restart):
def ocsp_req_mock(workfile):
"""Method to mock the OCSP request and write response to file"""
with open(workfile, 'w') as fh:
fh.write("MOCKRESPONSE")
return True
ocsp_path = "certbot.ocsp.OCSPResponseHandler.ocsp_request_to_file"
with mock.patch(ocsp_path, side_effect=ocsp_req_mock):
self.call_mocked(self.config.enable_ocsp_prefetch,
self.lineage,
["ocspvhost.com"])
self.assertEquals(len(self.config._ocsp_prefetch), 1)
refresh_path = "certbot_apache.configurator.ApacheConfigurator._ocsp_refresh"
with mock.patch(refresh_path) as mock_refresh:
self.call_mocked(self.config.update_ocsp_prefetch, None)
self.assertFalse(mock_refresh.called)
@mock.patch("certbot_apache.configurator.ApacheConfigurator.config_test")
def test_ocsp_prefetch_backup_db(self, _mock_test):
db_path = os.path.join(self.config_dir, "ocsp", "ocsp_cache")
def ocsp_del_db():
"""Side effect of _reload() that deletes the DBM file, like Apache
does when restarting"""
full_db_path = db_path + ".db"
os.remove(full_db_path)
self.assertFalse(os.path.isfile(full_db_path))
self.config._ensure_ocsp_dirs()
odbm = self.config._ocsp_dbm_open(db_path)
odbm["mock_key"] = b'mock_value'
self.config._ocsp_dbm_close(odbm)
# Mock OCSP prefetch dict to signify that there should be a db
self.config._ocsp_prefetch = {"mock": "value"}
rel_path = "certbot_apache.configurator.ApacheConfigurator._reload"
with mock.patch(rel_path, side_effect=ocsp_del_db) as mock_reload:
self.config.restart()
odbm = self.config._ocsp_dbm_open(db_path)
self.assertEquals(odbm["mock_key"], b'mock_value')
self.config._ocsp_dbm_close(odbm)
@mock.patch("certbot_apache.configurator.ApacheConfigurator.config_test")
@mock.patch("certbot_apache.configurator.ApacheConfigurator._reload")
def test_ocsp_prefetch_backup_db_error(self, _mock_reload, _mock_test):
self.config._ensure_ocsp_dirs()
log_path = "certbot_apache.configurator.logger.debug"
log_string = "Encountered an issue while trying to backup OCSP dbm file"
log_string2 = "Encountered an issue when trying to restore OCSP dbm file"
self.config._ocsp_prefetch = {"mock": "value"}
with mock.patch("shutil.copy2", side_effect=IOError):
with mock.patch(log_path) as mock_log:
self.config.restart()
self.assertTrue(mock_log.called)
self.assertEquals(mock_log.call_count, 2)
self.assertTrue(log_string in mock_log.call_args_list[0][0][0])
self.assertTrue(log_string2 in mock_log.call_args_list[1][0][0])
@mock.patch("certbot_apache.configurator.ApacheConfigurator.restart")
def test_ocsp_prefetch_refresh_fail(self, _mock_restart):
ocsp_path = "certbot.ocsp.OCSPResponseHandler.ocsp_request_to_file"
log_path = "certbot_apache.configurator.logger.warning"
with mock.patch(ocsp_path) as mock_ocsp:
mock_ocsp.return_value = False
with mock.patch(log_path) as mock_log:
self.call_mocked(self.config.enable_ocsp_prefetch,
self.lineage,
["ocspvhost.com"])
self.assertTrue(mock_log.called)
self.assertTrue(
"trying to prefetch OCSP" in mock_log.call_args[0][0])
@mock.patch("certbot_apache.configurator.ApacheConfigurator._ocsp_refresh_if_needed")
def test_ocsp_prefetch_update_noop(self, mock_refresh):
self.config.update_ocsp_prefetch(None)
self.assertFalse(mock_refresh.called)
if __name__ == "__main__":
unittest.main() # pragma: no cover

View File

@@ -60,6 +60,7 @@ CLI_DEFAULTS = dict(
redirect=None,
auto_hsts=False,
hsts=None,
ocsp_prefetch=False,
uir=None,
staple=None,
strict_permissions=False,

View File

@@ -14,6 +14,7 @@ import six
import zope.component
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes # type: ignore
from cryptography.hazmat.primitives.asymmetric.ec import ECDSA
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePublicKey
from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15
@@ -224,6 +225,18 @@ def verify_renewable_cert(renewable_cert):
verify_cert_matches_priv_key(renewable_cert.cert, renewable_cert.privkey)
def load_cert(cert_path):
"""Reads the certificate PEM file and returns a cryptography.x509 object
:param str cert_path: Path to the certificate
:rtype `cryptography.x509`:
:returns: x509 certificate object
"""
with open(cert_path, 'rb') as fh:
cert_pem = fh.read()
return x509.load_pem_x509_certificate(cert_pem, default_backend())
def verify_renewable_cert_sig(renewable_cert):
""" Verifies the signature of a `.storage.RenewableCert` object.
@@ -234,8 +247,7 @@ def verify_renewable_cert_sig(renewable_cert):
try:
with open(renewable_cert.chain, 'rb') as chain_file: # type: IO[bytes]
chain = x509.load_pem_x509_certificate(chain_file.read(), default_backend())
with open(renewable_cert.cert, 'rb') as cert_file: # type: IO[bytes]
cert = x509.load_pem_x509_certificate(cert_file.read(), default_backend())
cert = load_cert(renewable_cert.cert)
pk = chain.public_key()
with warnings.catch_warnings():
warnings.simplefilter("ignore")
@@ -458,6 +470,13 @@ def sha256sum(filename):
sha256.update(f.read())
return sha256.hexdigest()
def cert_sha1_fingerprint(cert_path):
"""Get sha1 digest of the certificate fingerprint"""
cert = load_cert(cert_path)
return cert.fingerprint(hashes.SHA1())
def cert_and_chain_from_fullchain(fullchain_pem):
"""Split fullchain_pem into cert_pem and chain_pem

View File

@@ -108,3 +108,12 @@ class ConfigurationError(Error):
class MissingCommandlineFlag(Error):
"""A command line argument was missing in noninteractive usage"""
# OCSP errors:
class OCSPRevokedError(Error):
"""Certificate is revoked based on the OCSP response"""
class OCSPRequestError(Error):
"""Error in OCSP request"""

View File

@@ -941,6 +941,8 @@ def enhance(config, plugins):
lineage = cert_manager.lineage_for_certname(config, config.certname)
if not config.chain_path:
config.chain_path = lineage.chain_path
if not config.cert_path:
config.cert_path = lineage.cert_path
if oldstyle_enh:
le_client = _init_le_client(config, authenticator=None, installer=installer)
le_client.enhance_config(domains, config.chain_path, ask_redirect=False)

View File

@@ -1,15 +1,109 @@
"""Tools for checking certificate revocation."""
import logging
import os
import re
from subprocess import Popen, PIPE
from certbot import crypto_util
from certbot import errors
from certbot import util
from cryptography.x509.oid import ExtensionOID, AuthorityInformationAccessOID
logger = logging.getLogger(__name__)
class RevocationChecker(object):
class OCSPBase(object):
"""Base class for OCSP request operations"""
def determine_ocsp_server(self, cert_path):
"""Extract the OCSP server host from a certificate.
:param str cert_path: Path to the cert we're checking OCSP for
:rtype tuple:
:returns: (OCSP server URL or None, OCSP server host or None)
"""
url = self._ocsp_host_from_cert(cert_path)
if url:
url = url.strip()
host = url.partition("://")[2].rstrip("/")
if host:
return url, host
else:
logger.info("Cannot process OCSP host from URL (%s) in cert at %s", url, cert_path)
return None, None
def _ocsp_host_from_cert(self, cert_path):
"""Helper method for determine_ocsp_server to read the actual OCSP
server information from a certificate file"""
cert = crypto_util.load_cert(cert_path)
ocsp_authinfo = cert.extensions.get_extension_for_oid(
ExtensionOID.AUTHORITY_INFORMATION_ACCESS)
for obj in ocsp_authinfo.value:
if obj.access_method == AuthorityInformationAccessOID.OCSP:
return obj.access_location.value
return None
def _request_status(self, response):
"""Checks that the OCSP response was successful from the text output"""
pattern = re.compile(r"OCSP Response Status: (.*)\n")
return "successful (0x0)" in pattern.findall(response)
def _response_successful(self, response, cert_path):
"""Checks that the certificate is not revoked"""
pattern = re.compile(r"{0}: (.*)\n".format(cert_path))
if "revoked" in pattern.findall(response):
raise errors.OCSPRevokedError("Certificate is revoked")
return "good" in pattern.findall(response)
class OCSPResponseHandler(OCSPBase):
"""Class for handling OCSP requests"""
def __init__(self, cert_path, chain_path):
self.cert_path = cert_path
self.chain_path = chain_path
def ocsp_request_to_file(self, filepath):
"""Make OCSP request and save the response to a file.
:param str filepath: Path to save the OCSP response to.
:returns: True if the response was successfully saved
:rtype: bool
:raises errors.OCSPRevokedError: If certificate is revoked.
"""
url, _ = self.determine_ocsp_server(self.cert_path)
cmd = ["openssl", "ocsp",
"-no_nonce",
"-issuer", self.chain_path,
"-cert", self.cert_path,
"-url", url,
"-CAfile", self.chain_path,
"-verify_other", self.chain_path,
"-trust_other",
"-respout", filepath,
"-text"]
try:
output, _ = util.run_script(cmd, log=logger.debug)
except errors.SubprocessError:
logger.info("OCSP check failed for %s (are we offline?)", self.cert_path)
return False
if not self._request_status(output):
raise errors.OCSPRequestError("OCSP request returned an error")
try:
if self._response_successful(output, self.cert_path):
return os.path.isfile(filepath)
except errors.OCSPRevokedError:
logger.warning("Certificate %s is revoked.", self.cert_path)
raise
return False
class RevocationChecker(OCSPBase):
"This class figures out OCSP checking on this system, and performs it."
def __init__(self):
@@ -20,7 +114,7 @@ class RevocationChecker(object):
self.broken = True
return
# New versions of openssl want -header var=val, old ones want -header var val
# New versions of openssl want -header var=val, old ones want -header var val
test_host_format = Popen(["openssl", "ocsp", "-header", "var", "val"],
stdout=PIPE, stderr=PIPE, universal_newlines=True)
_out, err = test_host_format.communicate()
@@ -69,30 +163,6 @@ class RevocationChecker(object):
return _translate_ocsp_query(cert_path, output, err)
def determine_ocsp_server(self, cert_path):
"""Extract the OCSP server host from a certificate.
:param str cert_path: Path to the cert we're checking OCSP for
:rtype tuple:
:returns: (OCSP server URL or None, OCSP server host or None)
"""
try:
url, _err = util.run_script(
["openssl", "x509", "-in", cert_path, "-noout", "-ocsp_uri"],
log=logger.debug)
except errors.SubprocessError:
logger.info("Cannot extract OCSP URI from %s", cert_path)
return None, None
url = url.rstrip()
host = url.partition("://")[2].rstrip("/")
if host:
return url, host
else:
logger.info("Cannot process OCSP host from URL (%s) in cert at %s", url, cert_path)
return None, None
def _translate_ocsp_query(cert_path, ocsp_output, ocsp_errors):
"""Parse openssl's weird output to work out what it means."""

View File

@@ -143,6 +143,60 @@ class AutoHSTSEnhancement(object):
:type domains: str
"""
@six.add_metaclass(abc.ABCMeta)
class OCSPPrefetchEnhancement(object):
"""
Enhancement interface that installer plugins can implement in order to
provide functionality that prefetches an OCSP response and stores it
to be served for incoming client requests.
The plugins implementing new style enhancements are responsible of handling
the saving of configuration checkpoints as well as calling possible restarts
of managed software themselves. For update_ocsp_prefetch method, the installer
may have to call prepare() to finalize the plugin initialization.
Methods:
enable_ocsp_prefetch is called when the domain is configured to
serve OCSP responses using mechanism called OCSP Stapling.
update_ocsp_prefetch is called every time when Certbot is run using 'renew'
verb. Certbot should proceed to make a request to the OCSP server in order
to fetch an OCSP response and to store the recieved response, if valid.
"""
@abc.abstractmethod
def update_ocsp_prefetch(self, lineage, *args, **kwargs):
"""
Gets called for each lineage every time Certbot is run with 'renew' verb.
Implementation of this method should fetch a fresh OCSP response and if
valid, store it to be served for connecting clients.
:param lineage: Certificate lineage object
:type lineage: certbot.storage.RenewableCert
.. note:: prepare() method inherited from `interfaces.IPlugin` might need
to be called manually within implementation of this interface method
to finalize the plugin initialization.
"""
@abc.abstractmethod
def enable_ocsp_prefetch(self, lineage, domains, *args, **kwargs):
"""
Enables the OCSP enhancement, enabling OCSP Stapling functionality for
the controlled software, and sets it up for prefetching the responses
over the subsequent runs of Certbot renew.
:param lineage: Certificate lineage object
:type lineage: certbot.storage.RenewableCert
:param domains: List of domains in certificate to enhance
:type domains: str
"""
# This is used to configure internal new style enhancements in Certbot. These
# enhancement interfaces need to be defined in this file. Please do not modify
# this list from plugin code.
@@ -160,5 +214,19 @@ _INDEX = [
"updater_function": "update_autohsts",
"deployer_function": "deploy_autohsts",
"enable_function": "enable_autohsts"
},
{
"name": "OCSPPrefetch",
"cli_help": "Prefetch OCSP responses for certificates in order to be "+
"able to serve connecting clients fresh staple immediately",
"cli_flag": "--ocsp-prefetch",
"cli_flag_default": constants.CLI_DEFAULTS["ocsp_prefetch"],
"cli_groups": ["security", "enhance"],
"cli_dest": "ocsp_prefetch",
"cli_action": "store_true",
"class": OCSPPrefetchEnhancement,
"updater_function": "update_ocsp_prefetch",
"deployer_function": None,
"enable_function": "enable_ocsp_prefetch"
}
] # type: List[Dict[str, Any]]

View File

@@ -73,20 +73,16 @@ class OCSPTest(unittest.TestCase):
@mock.patch('certbot.ocsp.logger.info')
@mock.patch('certbot.util.run_script')
def test_determine_ocsp_server(self, mock_run, mock_info):
@mock.patch('certbot.ocsp.RevocationChecker._ocsp_host_from_cert')
def test_determine_ocsp_server(self, mock_host, mock_info):
uri = "http://ocsp.stg-int-x1.letsencrypt.org/"
host = "ocsp.stg-int-x1.letsencrypt.org"
mock_run.return_value = uri, ""
mock_host.return_value = uri
self.assertEqual(self.checker.determine_ocsp_server("beep"), (uri, host))
mock_run.return_value = "ftp:/" + host + "/", ""
mock_host.return_value = "ftp:/" + host + "/"
self.assertEqual(self.checker.determine_ocsp_server("beep"), (None, None))
self.assertEqual(mock_info.call_count, 1)
c = "confusion"
mock_run.side_effect = errors.SubprocessError(c)
self.assertEqual(self.checker.determine_ocsp_server("beep"), (None, None))
@mock.patch('certbot.ocsp.logger')
@mock.patch('certbot.util.run_script')
def test_translate_ocsp(self, mock_run, mock_log):
@@ -111,7 +107,6 @@ class OCSPTest(unittest.TestCase):
self.assertEqual(ocsp._translate_ocsp_query(*openssl_expired_ocsp_revoked), True)
self.assertEqual(mock_log.info.call_count, 1)
# pylint: disable=line-too-long
openssl_confused = ("", """
/etc/letsencrypt/live/example.org/cert.pem: good