diff options
Diffstat (limited to 'testing/web-platform/tests/tools/sslutils/openssl.py')
-rw-r--r-- | testing/web-platform/tests/tools/sslutils/openssl.py | 405 |
1 files changed, 405 insertions, 0 deletions
diff --git a/testing/web-platform/tests/tools/sslutils/openssl.py b/testing/web-platform/tests/tools/sslutils/openssl.py new file mode 100644 index 000000000..26ed71135 --- /dev/null +++ b/testing/web-platform/tests/tools/sslutils/openssl.py @@ -0,0 +1,405 @@ +import functools +import os +import random +import shutil +import subprocess +import tempfile +from datetime import datetime + +class OpenSSL(object): + def __init__(self, logger, binary, base_path, conf_path, hosts, duration, + base_conf_path=None): + """Context manager for interacting with OpenSSL. + Creates a config file for the duration of the context. + + :param logger: stdlib logger or python structured logger + :param binary: path to openssl binary + :param base_path: path to directory for storing certificates + :param conf_path: path for configuration file storing configuration data + :param hosts: list of hosts to include in configuration (or None if not + generating host certificates) + :param duration: Certificate duration in days""" + + self.base_path = base_path + self.binary = binary + self.conf_path = conf_path + self.base_conf_path = base_conf_path + self.logger = logger + self.proc = None + self.cmd = [] + self.hosts = hosts + self.duration = duration + + def __enter__(self): + with open(self.conf_path, "w") as f: + f.write(get_config(self.base_path, self.hosts, self.duration)) + return self + + def __exit__(self, *args, **kwargs): + os.unlink(self.conf_path) + + def log(self, line): + if hasattr(self.logger, "process_output"): + self.logger.process_output(self.proc.pid if self.proc is not None else None, + line.decode("utf8", "replace"), + command=" ".join(self.cmd)) + else: + self.logger.debug(line) + + def __call__(self, cmd, *args, **kwargs): + """Run a command using OpenSSL in the current context. + + :param cmd: The openssl subcommand to run + :param *args: Additional arguments to pass to the command + """ + self.cmd = [self.binary, cmd] + if cmd != "x509": + self.cmd += ["-config", self.conf_path] + self.cmd += list(args) + + env = os.environ.copy() + if self.base_conf_path is not None: + env["OPENSSL_CONF"] = self.base_conf_path.encode("utf8") + + self.proc = subprocess.Popen(self.cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, + env=env) + stdout, stderr = self.proc.communicate() + self.log(stdout) + if self.proc.returncode != 0: + raise subprocess.CalledProcessError(self.proc.returncode, self.cmd, + output=stdout) + + self.cmd = [] + self.proc = None + return stdout + + +def make_subject(common_name, + country=None, + state=None, + locality=None, + organization=None, + organization_unit=None): + args = [("country", "C"), + ("state", "ST"), + ("locality", "L"), + ("organization", "O"), + ("organization_unit", "OU"), + ("common_name", "CN")] + + rv = [] + + for var, key in args: + value = locals()[var] + if value is not None: + rv.append("/%s=%s" % (key, value.replace("/", "\\/"))) + + return "".join(rv) + +def make_alt_names(hosts): + rv = [] + for name in hosts: + rv.append("DNS:%s" % name) + return ",".join(rv) + +def get_config(root_dir, hosts, duration=30): + if hosts is None: + san_line = "" + else: + san_line = "subjectAltName = %s" % make_alt_names(hosts) + + if os.path.sep == "\\": + # This seems to be needed for the Shining Light OpenSSL on + # Windows, at least. + root_dir = root_dir.replace("\\", "\\\\") + + rv = """[ ca ] +default_ca = CA_default + +[ CA_default ] +dir = %(root_dir)s +certs = $dir +new_certs_dir = $certs +crl_dir = $dir%(sep)scrl +database = $dir%(sep)sindex.txt +private_key = $dir%(sep)scakey.pem +certificate = $dir%(sep)scacert.pem +serial = $dir%(sep)sserial +crldir = $dir%(sep)scrl +crlnumber = $dir%(sep)scrlnumber +crl = $crldir%(sep)scrl.pem +RANDFILE = $dir%(sep)sprivate%(sep)s.rand +x509_extensions = usr_cert +name_opt = ca_default +cert_opt = ca_default +default_days = %(duration)d +default_crl_days = %(duration)d +default_md = sha256 +preserve = no +policy = policy_anything +copy_extensions = copy + +[ policy_anything ] +countryName = optional +stateOrProvinceName = optional +localityName = optional +organizationName = optional +organizationalUnitName = optional +commonName = supplied +emailAddress = optional + +[ req ] +default_bits = 2048 +default_keyfile = privkey.pem +distinguished_name = req_distinguished_name +attributes = req_attributes +x509_extensions = v3_ca + +# Passwords for private keys if not present they will be prompted for +# input_password = secret +# output_password = secret +string_mask = utf8only +req_extensions = v3_req + +[ req_distinguished_name ] +countryName = Country Name (2 letter code) +countryName_default = AU +countryName_min = 2 +countryName_max = 2 +stateOrProvinceName = State or Province Name (full name) +stateOrProvinceName_default = +localityName = Locality Name (eg, city) +0.organizationName = Organization Name +0.organizationName_default = Web Platform Tests +organizationalUnitName = Organizational Unit Name (eg, section) +#organizationalUnitName_default = +commonName = Common Name (e.g. server FQDN or YOUR name) +commonName_max = 64 +emailAddress = Email Address +emailAddress_max = 64 + +[ req_attributes ] + +[ usr_cert ] +basicConstraints=CA:false +subjectKeyIdentifier=hash +authorityKeyIdentifier=keyid,issuer + +[ v3_req ] +basicConstraints = CA:FALSE +keyUsage = nonRepudiation, digitalSignature, keyEncipherment +extendedKeyUsage = serverAuth +%(san_line)s + +[ v3_ca ] +basicConstraints = CA:true +subjectKeyIdentifier=hash +authorityKeyIdentifier=keyid:always,issuer:always +keyUsage = keyCertSign +""" % {"root_dir": root_dir, + "san_line": san_line, + "duration": duration, + "sep": os.path.sep.replace("\\", "\\\\")} + + return rv + +class OpenSSLEnvironment(object): + ssl_enabled = True + + def __init__(self, logger, openssl_binary="openssl", base_path=None, + password="web-platform-tests", force_regenerate=False, + duration=30, base_conf_path=None): + """SSL environment that creates a local CA and host certificate using OpenSSL. + + By default this will look in base_path for existing certificates that are still + valid and only create new certificates if there aren't any. This behaviour can + be adjusted using the force_regenerate option. + + :param logger: a stdlib logging compatible logger or mozlog structured logger + :param openssl_binary: Path to the OpenSSL binary + :param base_path: Path in which certificates will be stored. If None, a temporary + directory will be used and removed when the server shuts down + :param password: Password to use + :param force_regenerate: Always create a new certificate even if one already exists. + """ + self.logger = logger + + self.temporary = False + if base_path is None: + base_path = tempfile.mkdtemp() + self.temporary = True + + self.base_path = os.path.abspath(base_path) + self.password = password + self.force_regenerate = force_regenerate + self.duration = duration + self.base_conf_path = base_conf_path + + self.path = None + self.binary = openssl_binary + self.openssl = None + + self._ca_cert_path = None + self._ca_key_path = None + self.host_certificates = {} + + def __enter__(self): + if not os.path.exists(self.base_path): + os.makedirs(self.base_path) + + path = functools.partial(os.path.join, self.base_path) + + with open(path("index.txt"), "w"): + pass + with open(path("serial"), "w") as f: + serial = "%x" % random.randint(0, 1000000) + if len(serial) % 2: + serial = "0" + serial + f.write(serial) + + self.path = path + + return self + + def __exit__(self, *args, **kwargs): + if self.temporary: + shutil.rmtree(self.base_path) + + def _config_openssl(self, hosts): + conf_path = self.path("openssl.cfg") + return OpenSSL(self.logger, self.binary, self.base_path, conf_path, hosts, + self.duration, self.base_conf_path) + + def ca_cert_path(self): + """Get the path to the CA certificate file, generating a + new one if needed""" + if self._ca_cert_path is None and not self.force_regenerate: + self._load_ca_cert() + if self._ca_cert_path is None: + self._generate_ca() + return self._ca_cert_path + + def _load_ca_cert(self): + key_path = self.path("cakey.pem") + cert_path = self.path("cacert.pem") + + if self.check_key_cert(key_path, cert_path, None): + self.logger.info("Using existing CA cert") + self._ca_key_path, self._ca_cert_path = key_path, cert_path + + def check_key_cert(self, key_path, cert_path, hosts): + """Check that a key and cert file exist and are valid""" + if not os.path.exists(key_path) or not os.path.exists(cert_path): + return False + + with self._config_openssl(hosts) as openssl: + end_date_str = openssl("x509", + "-noout", + "-enddate", + "-in", cert_path).split("=", 1)[1].strip() + # Not sure if this works in other locales + end_date = datetime.strptime(end_date_str, "%b %d %H:%M:%S %Y %Z") + # Should have some buffer here e.g. 1 hr + if end_date < datetime.now(): + return False + + #TODO: check the key actually signed the cert. + return True + + def _generate_ca(self): + path = self.path + self.logger.info("Generating new CA in %s" % self.base_path) + + key_path = path("cakey.pem") + req_path = path("careq.pem") + cert_path = path("cacert.pem") + + with self._config_openssl(None) as openssl: + openssl("req", + "-batch", + "-new", + "-newkey", "rsa:2048", + "-keyout", key_path, + "-out", req_path, + "-subj", make_subject("web-platform-tests"), + "-passout", "pass:%s" % self.password) + + openssl("ca", + "-batch", + "-create_serial", + "-keyfile", key_path, + "-passin", "pass:%s" % self.password, + "-selfsign", + "-extensions", "v3_ca", + "-in", req_path, + "-out", cert_path) + + os.unlink(req_path) + + self._ca_key_path, self._ca_cert_path = key_path, cert_path + + def host_cert_path(self, hosts): + """Get a tuple of (private key path, certificate path) for a host, + generating new ones if necessary. + + hosts must be a list of all hosts to appear on the certificate, with + the primary hostname first.""" + hosts = tuple(hosts) + if hosts not in self.host_certificates: + if not self.force_regenerate: + key_cert = self._load_host_cert(hosts) + else: + key_cert = None + if key_cert is None: + key, cert = self._generate_host_cert(hosts) + else: + key, cert = key_cert + self.host_certificates[hosts] = key, cert + + return self.host_certificates[hosts] + + def _load_host_cert(self, hosts): + host = hosts[0] + key_path = self.path("%s.key" % host) + cert_path = self.path("%s.pem" % host) + + # TODO: check that this cert was signed by the CA cert + if self.check_key_cert(key_path, cert_path, hosts): + self.logger.info("Using existing host cert") + return key_path, cert_path + + def _generate_host_cert(self, hosts): + host = hosts[0] + if self._ca_key_path is None: + self._generate_ca() + ca_key_path = self._ca_key_path + + assert os.path.exists(ca_key_path) + + path = self.path + + req_path = path("wpt.req") + cert_path = path("%s.pem" % host) + key_path = path("%s.key" % host) + + self.logger.info("Generating new host cert") + + with self._config_openssl(hosts) as openssl: + openssl("req", + "-batch", + "-newkey", "rsa:2048", + "-keyout", key_path, + "-in", ca_key_path, + "-nodes", + "-out", req_path) + + openssl("ca", + "-batch", + "-in", req_path, + "-passin", "pass:%s" % self.password, + "-subj", make_subject(host), + "-out", cert_path) + + os.unlink(req_path) + + return key_path, cert_path |