Source code for certbot.crypto_util

"""Certbot client crypto utility functions.

.. todo:: Make the transition to use PSS rather than PKCS1_v1_5 when the server
    is capable of handling the signatures.

"""
import hashlib
import logging
import warnings

import pyrfc3339
import six
import zope.component
from OpenSSL import SSL  # type: ignore
from OpenSSL import crypto
# https://github.com/python/typeshed/tree/master/third_party/2/cryptography
from cryptography import x509  # type: ignore
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric.ec import ECDSA
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePublicKey
from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey

from acme import crypto_util as acme_crypto_util
from acme.magic_typing import IO  # pylint: disable=unused-import, no-name-in-module

from certbot import errors
from certbot import interfaces
from certbot import util
from certbot.compat import os

logger = logging.getLogger(__name__)


# High level functions
[docs]def init_save_key(key_size, key_dir, keyname="key-certbot.pem"): """Initializes and saves a privkey. Inits key and saves it in PEM format on the filesystem. .. note:: keyname is the attempted filename, it may be different if a file already exists at the path. :param int key_size: RSA key size in bits :param str key_dir: Key save directory. :param str keyname: Filename of key :returns: Key :rtype: :class:`certbot.util.Key` :raises ValueError: If unable to generate the key given key_size. """ try: key_pem = make_key(key_size) except ValueError as err: logger.error("", exc_info=True) raise err config = zope.component.getUtility(interfaces.IConfig) # Save file util.make_or_verify_dir(key_dir, 0o700, config.strict_permissions) key_f, key_path = util.unique_file( os.path.join(key_dir, keyname), 0o600, "wb") with key_f: key_f.write(key_pem) logger.debug("Generating key (%d bits): %s", key_size, key_path) return util.Key(key_path, key_pem)
[docs]def init_save_csr(privkey, names, path): """Initialize a CSR with the given private key. :param privkey: Key to include in the CSR :type privkey: :class:`certbot.util.Key` :param set names: `str` names to include in the CSR :param str path: Certificate save directory. :returns: CSR :rtype: :class:`certbot.util.CSR` """ config = zope.component.getUtility(interfaces.IConfig) csr_pem = acme_crypto_util.make_csr( privkey.pem, names, must_staple=config.must_staple) # Save CSR util.make_or_verify_dir(path, 0o755, config.strict_permissions) csr_f, csr_filename = util.unique_file( os.path.join(path, "csr-certbot.pem"), 0o644, "wb") with csr_f: csr_f.write(csr_pem) logger.debug("Creating CSR: %s", csr_filename) return util.CSR(csr_filename, csr_pem, "pem")
# WARNING: the csr and private key file are possible attack vectors for TOCTOU # We should either... # A. Do more checks to verify that the CSR is trusted/valid # B. Audit the parsing code for vulnerabilities
[docs]def valid_csr(csr): """Validate CSR. Check if `csr` is a valid CSR for the given domains. :param str csr: CSR in PEM. :returns: Validity of CSR. :rtype: bool """ try: req = crypto.load_certificate_request( crypto.FILETYPE_PEM, csr) return req.verify(req.get_pubkey()) except crypto.Error: logger.debug("", exc_info=True) return False
[docs]def csr_matches_pubkey(csr, privkey): """Does private key correspond to the subject public key in the CSR? :param str csr: CSR in PEM. :param str privkey: Private key file contents (PEM) :returns: Correspondence of private key to CSR subject public key. :rtype: bool """ req = crypto.load_certificate_request( crypto.FILETYPE_PEM, csr) pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, privkey) try: return req.verify(pkey) except crypto.Error: logger.debug("", exc_info=True) return False
[docs]def import_csr_file(csrfile, data): """Import a CSR file, which can be either PEM or DER. :param str csrfile: CSR filename :param str data: contents of the CSR file :returns: (`crypto.FILETYPE_PEM`, util.CSR object representing the CSR, list of domains requested in the CSR) :rtype: tuple """ PEM = crypto.FILETYPE_PEM load = crypto.load_certificate_request try: # Try to parse as DER first, then fall back to PEM. csr = load(crypto.FILETYPE_ASN1, data) except crypto.Error: try: csr = load(PEM, data) except crypto.Error: raise errors.Error("Failed to parse CSR file: {0}".format(csrfile)) domains = _get_names_from_loaded_cert_or_req(csr) # Internally we always use PEM, so re-encode as PEM before returning. data_pem = crypto.dump_certificate_request(PEM, csr) return PEM, util.CSR(file=csrfile, data=data_pem, form="pem"), domains
[docs]def make_key(bits): """Generate PEM encoded RSA key. :param int bits: Number of bits, at least 1024. :returns: new RSA key in PEM form with specified number of bits :rtype: str """ assert bits >= 1024 # XXX key = crypto.PKey() key.generate_key(crypto.TYPE_RSA, bits) return crypto.dump_privatekey(crypto.FILETYPE_PEM, key)
[docs]def valid_privkey(privkey): """Is valid RSA private key? :param str privkey: Private key file contents in PEM :returns: Validity of private key. :rtype: bool """ try: return crypto.load_privatekey( crypto.FILETYPE_PEM, privkey).check() except (TypeError, crypto.Error): return False
[docs]def verify_renewable_cert(renewable_cert): """For checking that your certs were not corrupted on disk. Several things are checked: 1. Signature verification for the cert. 2. That fullchain matches cert and chain when concatenated. 3. Check that the private key matches the certificate. :param `.storage.RenewableCert` renewable_cert: cert to verify :raises errors.Error: If verification fails. """ verify_renewable_cert_sig(renewable_cert) verify_fullchain(renewable_cert) verify_cert_matches_priv_key(renewable_cert.cert, renewable_cert.privkey)
[docs]def verify_renewable_cert_sig(renewable_cert): """Verifies the signature of a `.storage.RenewableCert` object. :param `.storage.RenewableCert` renewable_cert: cert to verify :raises errors.Error: If signature verification fails. """ try: with open(renewable_cert.chain, 'rb') as chain_file: # type: IO[bytes] chain = x509.load_pem_x509_certificate(chain_file.read(), default_backend()) with open(renewable_cert.cert, 'rb') as cert_file: # type: IO[bytes] cert = x509.load_pem_x509_certificate(cert_file.read(), default_backend()) pk = chain.public_key() with warnings.catch_warnings(): verify_signed_payload(pk, cert.signature, cert.tbs_certificate_bytes, cert.signature_hash_algorithm) except (IOError, ValueError, InvalidSignature) as e: error_str = "verifying the signature of the cert located at {0} has failed. \ Details: {1}".format(renewable_cert.cert, e) logger.exception(error_str) raise errors.Error(error_str)
[docs]def verify_signed_payload(public_key, signature, payload, signature_hash_algorithm): """Check the signature of a payload. :param RSAPublicKey/EllipticCurvePublicKey public_key: the public_key to check signature :param bytes signature: the signature bytes :param bytes payload: the payload bytes :param cryptography.hazmat.primitives.hashes.HashAlgorithm signature_hash_algorithm: algorithm used to hash the payload :raises InvalidSignature: If signature verification fails. :raises errors.Error: If public key type is not supported """ with warnings.catch_warnings(): warnings.simplefilter("ignore") if isinstance(public_key, RSAPublicKey): # https://github.com/python/typeshed/blob/master/third_party/2/cryptography/hazmat/primitives/asymmetric/rsa.pyi verifier = public_key.verifier( # type: ignore signature, PKCS1v15(), signature_hash_algorithm ) verifier.update(payload) verifier.verify() elif isinstance(public_key, EllipticCurvePublicKey): verifier = public_key.verifier( signature, ECDSA(signature_hash_algorithm) ) verifier.update(payload) verifier.verify() else: raise errors.Error("Unsupported public key type")
[docs]def verify_cert_matches_priv_key(cert_path, key_path): """ Verifies that the private key and cert match. :param str cert_path: path to a cert in PEM format :param str key_path: path to a private key file :raises errors.Error: If they don't match. """ try: context = SSL.Context(SSL.SSLv23_METHOD) context.use_certificate_file(cert_path) context.use_privatekey_file(key_path) context.check_privatekey() except (IOError, SSL.Error) as e: error_str = "verifying the cert located at {0} matches the \ private key located at {1} has failed. \ Details: {2}".format(cert_path, key_path, e) logger.exception(error_str) raise errors.Error(error_str)
[docs]def verify_fullchain(renewable_cert): """ Verifies that fullchain is indeed cert concatenated with chain. :param `.storage.RenewableCert` renewable_cert: cert to verify :raises errors.Error: If cert and chain do not combine to fullchain. """ try: with open(renewable_cert.chain) as chain_file: # type: IO[str] chain = chain_file.read() with open(renewable_cert.cert) as cert_file: # type: IO[str] cert = cert_file.read() with open(renewable_cert.fullchain) as fullchain_file: # type: IO[str] fullchain = fullchain_file.read() if (cert + chain) != fullchain: error_str = "fullchain does not match cert + chain for {0}!" error_str = error_str.format(renewable_cert.lineagename) raise errors.Error(error_str) except IOError as e: error_str = "reading one of cert, chain, or fullchain has failed: {0}".format(e) logger.exception(error_str) raise errors.Error(error_str) except errors.Error as e: raise e
[docs]def pyopenssl_load_certificate(data): """Load PEM/DER certificate. :raises errors.Error: """ openssl_errors = [] for file_type in (crypto.FILETYPE_PEM, crypto.FILETYPE_ASN1): try: return crypto.load_certificate(file_type, data), file_type except crypto.Error as error: # TODO: other errors? openssl_errors.append(error) raise errors.Error("Unable to load: {0}".format(",".join( str(error) for error in openssl_errors)))
def _load_cert_or_req(cert_or_req_str, load_func, typ=crypto.FILETYPE_PEM): try: return load_func(typ, cert_or_req_str) except crypto.Error: logger.error("", exc_info=True) raise def _get_sans_from_cert_or_req(cert_or_req_str, load_func, typ=crypto.FILETYPE_PEM): # pylint: disable=protected-access return acme_crypto_util._pyopenssl_cert_or_req_san(_load_cert_or_req( cert_or_req_str, load_func, typ))
[docs]def get_sans_from_cert(cert, typ=crypto.FILETYPE_PEM): """Get a list of Subject Alternative Names from a certificate. :param str cert: Certificate (encoded). :param typ: `crypto.FILETYPE_PEM` or `crypto.FILETYPE_ASN1` :returns: A list of Subject Alternative Names. :rtype: list """ return _get_sans_from_cert_or_req( cert, crypto.load_certificate, typ)
def _get_names_from_cert_or_req(cert_or_req, load_func, typ): loaded_cert_or_req = _load_cert_or_req(cert_or_req, load_func, typ) return _get_names_from_loaded_cert_or_req(loaded_cert_or_req) def _get_names_from_loaded_cert_or_req(loaded_cert_or_req): # pylint: disable=protected-access return acme_crypto_util._pyopenssl_cert_or_req_all_names(loaded_cert_or_req)
[docs]def get_names_from_cert(csr, typ=crypto.FILETYPE_PEM): """Get a list of domains from a cert, including the CN if it is set. :param str cert: Certificate (encoded). :param typ: `crypto.FILETYPE_PEM` or `crypto.FILETYPE_ASN1` :returns: A list of domain names. :rtype: list """ return _get_names_from_cert_or_req( csr, crypto.load_certificate, typ)
[docs]def dump_pyopenssl_chain(chain, filetype=crypto.FILETYPE_PEM): """Dump certificate chain into a bundle. :param list chain: List of `crypto.X509` (or wrapped in :class:`josepy.util.ComparableX509`). """ # XXX: returns empty string when no chain is available, which # shuts up RenewableCert, but might not be the best solution... return acme_crypto_util.dump_pyopenssl_chain(chain, filetype)
[docs]def notBefore(cert_path): """When does the cert at cert_path start being valid? :param str cert_path: path to a cert in PEM format :returns: the notBefore value from the cert at cert_path :rtype: :class:`datetime.datetime` """ return _notAfterBefore(cert_path, crypto.X509.get_notBefore)
[docs]def notAfter(cert_path): """When does the cert at cert_path stop being valid? :param str cert_path: path to a cert in PEM format :returns: the notAfter value from the cert at cert_path :rtype: :class:`datetime.datetime` """ return _notAfterBefore(cert_path, crypto.X509.get_notAfter)
[docs]def _notAfterBefore(cert_path, method): """Internal helper function for finding notbefore/notafter. :param str cert_path: path to a cert in PEM format :param function method: one of ``crypto.X509.get_notBefore`` or ``crypto.X509.get_notAfter`` :returns: the notBefore or notAfter value from the cert at cert_path :rtype: :class:`datetime.datetime` """ # pylint: disable=redefined-outer-name with open(cert_path) as f: x509 = crypto.load_certificate(crypto.FILETYPE_PEM, f.read()) # pyopenssl always returns bytes timestamp = method(x509) reformatted_timestamp = [timestamp[0:4], b"-", timestamp[4:6], b"-", timestamp[6:8], b"T", timestamp[8:10], b":", timestamp[10:12], b":", timestamp[12:]] timestamp_str = b"".join(reformatted_timestamp) # pyrfc3339 uses "native" strings. That is, bytes on Python 2 and unicode # on Python 3 if six.PY3: timestamp_str = timestamp_str.decode('ascii') return pyrfc3339.parse(timestamp_str)
[docs]def sha256sum(filename): """Compute a sha256sum of a file. NB: In given file, platform specific newlines characters will be converted into their equivalent unicode counterparts before calculating the hash. :param str filename: path to the file whose hash will be computed :returns: sha256 digest of the file in hexadecimal :rtype: str """ sha256 = hashlib.sha256() with open(filename, 'r') as file_d: sha256.update(file_d.read().encode('UTF-8')) return sha256.hexdigest()
[docs]def cert_and_chain_from_fullchain(fullchain_pem): """Split fullchain_pem into cert_pem and chain_pem :param str fullchain_pem: concatenated cert + chain :returns: tuple of string cert_pem and chain_pem :rtype: tuple """ cert = crypto.dump_certificate(crypto.FILETYPE_PEM, crypto.load_certificate(crypto.FILETYPE_PEM, fullchain_pem)).decode() chain = fullchain_pem[len(cert):].lstrip() return (cert, chain)