summaryrefslogtreecommitdiffstats
path: root/security/manager/ssl/tests/unit/pycert.py
diff options
context:
space:
mode:
Diffstat (limited to 'security/manager/ssl/tests/unit/pycert.py')
-rwxr-xr-xsecurity/manager/ssl/tests/unit/pycert.py697
1 files changed, 697 insertions, 0 deletions
diff --git a/security/manager/ssl/tests/unit/pycert.py b/security/manager/ssl/tests/unit/pycert.py
new file mode 100755
index 000000000..9c8efa915
--- /dev/null
+++ b/security/manager/ssl/tests/unit/pycert.py
@@ -0,0 +1,697 @@
+#!/usr/bin/env python
+#
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+"""
+Reads a certificate specification from stdin or a file and outputs a
+signed x509 certificate with the desired properties.
+
+The input format is as follows:
+
+issuer:<issuer distinguished name specification>
+subject:<subject distinguished name specification>
+[version:{1,2,3,4}]
+[validity:<YYYYMMDD-YYYYMMDD|duration in days>]
+[issuerKey:<key specification>]
+[subjectKey:<key specification>]
+[signature:{sha256WithRSAEncryption,sha1WithRSAEncryption,
+ md5WithRSAEncryption,ecdsaWithSHA256}]
+[serialNumber:<integer in the interval [1, 127]>]
+[extension:<extension name:<extension-specific data>>]
+[...]
+
+Known extensions are:
+basicConstraints:[cA],[pathLenConstraint]
+keyUsage:[digitalSignature,nonRepudiation,keyEncipherment,
+ dataEncipherment,keyAgreement,keyCertSign,cRLSign]
+extKeyUsage:[serverAuth,clientAuth,codeSigning,emailProtection
+ nsSGC, # Netscape Server Gated Crypto
+ OCSPSigning,timeStamping]
+subjectAlternativeName:[<dNSName|directoryName>,...]
+authorityInformationAccess:<OCSP URI>
+certificatePolicies:[<policy OID>,...]
+nameConstraints:{permitted,excluded}:[<dNSName|directoryName>,...]
+nsCertType:sslServer
+TLSFeature:[<TLSFeature>,...]
+
+Where:
+ [] indicates an optional field or component of a field
+ <> indicates a required component of a field
+ {} indicates a choice of exactly one value among a set of values
+ [a,b,c] indicates a list of potential values, of which zero or more
+ may be used
+
+For instance, the version field is optional. However, if it is
+specified, it must have exactly one value from the set {1,2,3,4}.
+
+Most fields have reasonable default values. By default one shared RSA
+key is used for all signatures and subject public key information
+fields. Using "issuerKey:<key specification>" or
+"subjectKey:<key specification>" causes a different key be used for
+signing or as the subject public key information field, respectively.
+See pykey.py for the list of available specifications.
+The signature algorithm is sha256WithRSAEncryption by default.
+
+The validity period may be specified as either concrete notBefore and
+notAfter values or as a validity period centered around 'now'. For the
+latter, this will result in a notBefore of 'now' - duration/2 and a
+notAfter of 'now' + duration/2.
+
+Issuer and subject distinguished name specifications are of the form
+'[stringEncoding]/C=XX/O=Example/CN=example.com'. C (country name), ST
+(state or province name), L (locality name), O (organization name), OU
+(organizational unit name), CN (common name) and emailAddress (email
+address) are currently supported. The optional stringEncoding field may
+be 'utf8String' or 'printableString'. If the given string does not
+contain a '/', it is assumed to represent a common name. If an empty
+string is provided, then an empty distinguished name is returned.
+DirectoryNames also use this format. When specifying a directoryName in
+a nameConstraints extension, the implicit form may not be used.
+
+If an extension name has '[critical]' after it, it will be marked as
+critical. Otherwise (by default), it will not be marked as critical.
+
+TLSFeature values can either consist of a named value (currently only
+'OCSPMustStaple' which corresponds to status_request) or a numeric TLS
+feature value (see rfc7633 for more information).
+
+If a serial number is not explicitly specified, it is automatically
+generated based on the contents of the certificate.
+"""
+
+from pyasn1.codec.der import decoder
+from pyasn1.codec.der import encoder
+from pyasn1.type import constraint, namedtype, tag, univ, useful
+from pyasn1_modules import rfc2459
+import base64
+import datetime
+import hashlib
+import re
+import sys
+
+import pykey
+
+# The GeneralSubtree definition in pyasn1_modules.rfc2459 is incorrect.
+# Where this definition uses a DefaultedNamedType, pyasn1_modules uses
+# a NamedType, which results in the default value being explicitly
+# encoded, which is incorrect for DER.
+class GeneralSubtree(univ.Sequence):
+ componentType = namedtype.NamedTypes(
+ namedtype.NamedType('base', rfc2459.GeneralName()),
+ namedtype.DefaultedNamedType('minimum', rfc2459.BaseDistance(0).subtype(
+ implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0))),
+ namedtype.OptionalNamedType('maximum', rfc2459.BaseDistance().subtype(
+ implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 1)))
+ )
+
+
+# The NameConstraints definition in pyasn1_modules.rfc2459 is incorrect.
+# excludedSubtrees has a tag value of 1, not 0.
+class NameConstraints(univ.Sequence):
+ componentType = namedtype.NamedTypes(
+ namedtype.OptionalNamedType('permittedSubtrees', rfc2459.GeneralSubtrees().subtype(
+ implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0))),
+ namedtype.OptionalNamedType('excludedSubtrees', rfc2459.GeneralSubtrees().subtype(
+ implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 1)))
+ )
+
+
+class Error(Exception):
+ """Base class for exceptions in this module."""
+ pass
+
+
+class UnknownBaseError(Error):
+ """Base class for handling unexpected input in this module."""
+ def __init__(self, value):
+ super(UnknownBaseError, self).__init__()
+ self.value = value
+ self.category = 'input'
+
+ def __str__(self):
+ return 'Unknown %s type "%s"' % (self.category, repr(self.value))
+
+
+class UnknownAlgorithmTypeError(UnknownBaseError):
+ """Helper exception type to handle unknown algorithm types."""
+
+ def __init__(self, value):
+ UnknownBaseError.__init__(self, value)
+ self.category = 'algorithm'
+
+
+class UnknownParameterTypeError(UnknownBaseError):
+ """Helper exception type to handle unknown input parameters."""
+
+ def __init__(self, value):
+ UnknownBaseError.__init__(self, value)
+ self.category = 'parameter'
+
+
+class UnknownExtensionTypeError(UnknownBaseError):
+ """Helper exception type to handle unknown input extensions."""
+
+ def __init__(self, value):
+ UnknownBaseError.__init__(self, value)
+ self.category = 'extension'
+
+
+class UnknownKeyPurposeTypeError(UnknownBaseError):
+ """Helper exception type to handle unknown key purposes."""
+
+ def __init__(self, value):
+ UnknownBaseError.__init__(self, value)
+ self.category = 'keyPurpose'
+
+
+class UnknownKeyTargetError(UnknownBaseError):
+ """Helper exception type to handle unknown key targets."""
+
+ def __init__(self, value):
+ UnknownBaseError.__init__(self, value)
+ self.category = 'key target'
+
+
+class UnknownVersionError(UnknownBaseError):
+ """Helper exception type to handle unknown specified versions."""
+
+ def __init__(self, value):
+ UnknownBaseError.__init__(self, value)
+ self.category = 'version'
+
+
+class UnknownNameConstraintsSpecificationError(UnknownBaseError):
+ """Helper exception type to handle unknown specified
+ nameConstraints."""
+
+ def __init__(self, value):
+ UnknownBaseError.__init__(self, value)
+ self.category = 'nameConstraints specification'
+
+
+class UnknownDNTypeError(UnknownBaseError):
+ """Helper exception type to handle unknown DN types."""
+
+ def __init__(self, value):
+ UnknownBaseError.__init__(self, value)
+ self.category = 'DN'
+
+
+class UnknownNSCertTypeError(UnknownBaseError):
+ """Helper exception type to handle unknown nsCertType types."""
+
+ def __init__(self, value):
+ UnknownBaseError.__init__(self, value)
+ self.category = 'nsCertType'
+
+
+class UnknownTLSFeature(UnknownBaseError):
+ """Helper exception type to handle unknown TLS Features."""
+
+ def __init__(self, value):
+ UnknownBaseError.__init__(self, value)
+ self.category = 'TLSFeature'
+
+
+class InvalidSerialNumber(Error):
+ """Exception type to handle invalid serial numbers."""
+
+ def __init__(self, value):
+ super(InvalidSerialNumber, self).__init__()
+ self.value = value
+
+ def __str__(self):
+ return repr(self.value)
+
+
+def getASN1Tag(asn1Type):
+ """Helper function for returning the base tag value of a given
+ type from the pyasn1 package"""
+ return asn1Type.baseTagSet.getBaseTag().asTuple()[2]
+
+def stringToAccessDescription(string):
+ """Helper function that takes a string representing a URI
+ presumably identifying an OCSP authority information access
+ location. Returns an AccessDescription usable by pyasn1."""
+ accessMethod = rfc2459.id_ad_ocsp
+ accessLocation = rfc2459.GeneralName()
+ accessLocation.setComponentByName('uniformResourceIdentifier', string)
+ sequence = univ.Sequence()
+ sequence.setComponentByPosition(0, accessMethod)
+ sequence.setComponentByPosition(1, accessLocation)
+ return sequence
+
+def stringToDN(string, tag=None):
+ """Takes a string representing a distinguished name or directory
+ name and returns a Name for use by pyasn1. See the documentation
+ for the issuer and subject fields for more details. Takes an
+ optional implicit tag in cases where the Name needs to be tagged
+ differently."""
+ if string and '/' not in string:
+ string = '/CN=%s' % string
+ rdns = rfc2459.RDNSequence()
+ pattern = '/(C|ST|L|O|OU|CN|emailAddress)='
+ split = re.split(pattern, string)
+ # split should now be [[encoding], <type>, <value>, <type>, <value>, ...]
+ if split[0]:
+ encoding = split[0]
+ else:
+ encoding = 'utf8String'
+ for pos, (nameType, value) in enumerate(zip(split[1::2], split[2::2])):
+ ava = rfc2459.AttributeTypeAndValue()
+ if nameType == 'C':
+ ava.setComponentByName('type', rfc2459.id_at_countryName)
+ nameComponent = rfc2459.X520countryName(value)
+ elif nameType == 'ST':
+ ava.setComponentByName('type', rfc2459.id_at_stateOrProvinceName)
+ nameComponent = rfc2459.X520StateOrProvinceName()
+ elif nameType == 'L':
+ ava.setComponentByName('type', rfc2459.id_at_localityName)
+ nameComponent = rfc2459.X520LocalityName()
+ elif nameType == 'O':
+ ava.setComponentByName('type', rfc2459.id_at_organizationName)
+ nameComponent = rfc2459.X520OrganizationName()
+ elif nameType == 'OU':
+ ava.setComponentByName('type', rfc2459.id_at_organizationalUnitName)
+ nameComponent = rfc2459.X520OrganizationalUnitName()
+ elif nameType == 'CN':
+ ava.setComponentByName('type', rfc2459.id_at_commonName)
+ nameComponent = rfc2459.X520CommonName()
+ elif nameType == 'emailAddress':
+ ava.setComponentByName('type', rfc2459.emailAddress)
+ nameComponent = rfc2459.Pkcs9email(value)
+ else:
+ raise UnknownDNTypeError(nameType)
+ if not nameType == 'C' and not nameType == 'emailAddress':
+ # The value may have things like '\0' (i.e. a slash followed by
+ # the number zero) that have to be decoded into the resulting
+ # '\x00' (i.e. a byte with value zero).
+ nameComponent.setComponentByName(encoding, value.decode(encoding='string_escape'))
+ ava.setComponentByName('value', nameComponent)
+ rdn = rfc2459.RelativeDistinguishedName()
+ rdn.setComponentByPosition(0, ava)
+ rdns.setComponentByPosition(pos, rdn)
+ if tag:
+ name = rfc2459.Name().subtype(implicitTag=tag)
+ else:
+ name = rfc2459.Name()
+ name.setComponentByPosition(0, rdns)
+ return name
+
+def stringToAlgorithmIdentifiers(string):
+ """Helper function that converts a description of an algorithm
+ to a representation usable by the pyasn1 package and a hash
+ algorithm name for use by pykey."""
+ algorithmIdentifier = rfc2459.AlgorithmIdentifier()
+ algorithmName = None
+ algorithm = None
+ if string == 'sha1WithRSAEncryption':
+ algorithmName = 'SHA-1'
+ algorithm = rfc2459.sha1WithRSAEncryption
+ elif string == 'sha256WithRSAEncryption':
+ algorithmName = 'SHA-256'
+ algorithm = univ.ObjectIdentifier('1.2.840.113549.1.1.11')
+ elif string == 'md5WithRSAEncryption':
+ algorithmName = 'MD5'
+ algorithm = rfc2459.md5WithRSAEncryption
+ elif string == 'ecdsaWithSHA256':
+ algorithmName = 'sha256'
+ algorithm = univ.ObjectIdentifier('1.2.840.10045.4.3.2')
+ else:
+ raise UnknownAlgorithmTypeError(string)
+ algorithmIdentifier.setComponentByName('algorithm', algorithm)
+ return (algorithmIdentifier, algorithmName)
+
+def datetimeToTime(dt):
+ """Takes a datetime object and returns an rfc2459.Time object with
+ that time as its value as a GeneralizedTime"""
+ time = rfc2459.Time()
+ time.setComponentByName('generalTime', useful.GeneralizedTime(dt.strftime('%Y%m%d%H%M%SZ')))
+ return time
+
+def serialBytesToString(serialBytes):
+ """Takes a list of integers in the interval [0, 255] and returns
+ the corresponding serial number string."""
+ serialBytesLen = len(serialBytes)
+ if serialBytesLen > 127:
+ raise InvalidSerialNumber("{} bytes is too long".format(serialBytesLen))
+ # Prepend the ASN.1 INTEGER tag and length bytes.
+ stringBytes = [getASN1Tag(univ.Integer), serialBytesLen] + serialBytes
+ return ''.join(chr(b) for b in stringBytes)
+
+class Certificate(object):
+ """Utility class for reading a certificate specification and
+ generating a signed x509 certificate"""
+
+ def __init__(self, paramStream):
+ self.versionValue = 2 # a value of 2 is X509v3
+ self.signature = 'sha256WithRSAEncryption'
+ self.issuer = 'Default Issuer'
+ actualNow = datetime.datetime.utcnow()
+ self.now = datetime.datetime.strptime(str(actualNow.year), '%Y')
+ aYearAndAWhile = datetime.timedelta(days=400)
+ self.notBefore = self.now - aYearAndAWhile
+ self.notAfter = self.now + aYearAndAWhile
+ self.subject = 'Default Subject'
+ self.extensions = None
+ self.subjectKey = pykey.keyFromSpecification('default')
+ self.issuerKey = pykey.keyFromSpecification('default')
+ self.serialNumber = None
+ self.decodeParams(paramStream)
+ # If a serial number wasn't specified, generate one based on
+ # the certificate contents.
+ if not self.serialNumber:
+ self.serialNumber = self.generateSerialNumber()
+
+ def generateSerialNumber(self):
+ """Generates a serial number for this certificate based on its
+ contents. Intended to be reproducible for compatibility with
+ the build system on OS X (see the comment above main, later in
+ this file)."""
+ hasher = hashlib.sha256()
+ hasher.update(str(self.versionValue))
+ hasher.update(self.signature)
+ hasher.update(self.issuer)
+ hasher.update(str(self.notBefore))
+ hasher.update(str(self.notAfter))
+ hasher.update(self.subject)
+ if self.extensions:
+ for extension in self.extensions:
+ hasher.update(str(extension))
+ serialBytes = [ord(c) for c in hasher.digest()[:20]]
+ # Ensure that the most significant bit isn't set (which would
+ # indicate a negative number, which isn't valid for serial
+ # numbers).
+ serialBytes[0] &= 0x7f
+ # Also ensure that the least significant bit on the most
+ # significant byte is set (to prevent a leading zero byte,
+ # which also wouldn't be valid).
+ serialBytes[0] |= 0x01
+ return serialBytesToString(serialBytes)
+
+ def decodeParams(self, paramStream):
+ for line in paramStream.readlines():
+ self.decodeParam(line.strip())
+
+ def decodeParam(self, line):
+ param = line.split(':')[0]
+ value = ':'.join(line.split(':')[1:])
+ if param == 'version':
+ self.setVersion(value)
+ elif param == 'subject':
+ self.subject = value
+ elif param == 'issuer':
+ self.issuer = value
+ elif param == 'validity':
+ self.decodeValidity(value)
+ elif param == 'extension':
+ self.decodeExtension(value)
+ elif param == 'issuerKey':
+ self.setupKey('issuer', value)
+ elif param == 'subjectKey':
+ self.setupKey('subject', value)
+ elif param == 'signature':
+ self.signature = value
+ elif param == 'serialNumber':
+ serialNumber = int(value)
+ # Ensure only serial numbers that conform to the rules listed in
+ # generateSerialNumber() are permitted.
+ if serialNumber < 1 or serialNumber > 127:
+ raise InvalidSerialNumber(value)
+ self.serialNumber = serialBytesToString([serialNumber])
+ else:
+ raise UnknownParameterTypeError(param)
+
+ def setVersion(self, version):
+ intVersion = int(version)
+ if intVersion >= 1 and intVersion <= 4:
+ self.versionValue = intVersion - 1
+ else:
+ raise UnknownVersionError(version)
+
+ def decodeValidity(self, duration):
+ match = re.search('([0-9]{8})-([0-9]{8})', duration)
+ if match:
+ self.notBefore = datetime.datetime.strptime(match.group(1), '%Y%m%d')
+ self.notAfter = datetime.datetime.strptime(match.group(2), '%Y%m%d')
+ else:
+ delta = datetime.timedelta(days=(int(duration) / 2))
+ self.notBefore = self.now - delta
+ self.notAfter = self.now + delta
+
+ def decodeExtension(self, extension):
+ match = re.search(r'([a-zA-Z]+)(\[critical\])?:(.*)', extension)
+ if not match:
+ raise UnknownExtensionTypeError(extension)
+ extensionType = match.group(1)
+ critical = match.group(2)
+ value = match.group(3)
+ if extensionType == 'basicConstraints':
+ self.addBasicConstraints(value, critical)
+ elif extensionType == 'keyUsage':
+ self.addKeyUsage(value, critical)
+ elif extensionType == 'extKeyUsage':
+ self.addExtKeyUsage(value, critical)
+ elif extensionType == 'subjectAlternativeName':
+ self.addSubjectAlternativeName(value, critical)
+ elif extensionType == 'authorityInformationAccess':
+ self.addAuthorityInformationAccess(value, critical)
+ elif extensionType == 'certificatePolicies':
+ self.addCertificatePolicies(value, critical)
+ elif extensionType == 'nameConstraints':
+ self.addNameConstraints(value, critical)
+ elif extensionType == 'nsCertType':
+ self.addNSCertType(value, critical)
+ elif extensionType == 'TLSFeature':
+ self.addTLSFeature(value, critical)
+ else:
+ raise UnknownExtensionTypeError(extensionType)
+
+ def setupKey(self, subjectOrIssuer, value):
+ if subjectOrIssuer == 'subject':
+ self.subjectKey = pykey.keyFromSpecification(value)
+ elif subjectOrIssuer == 'issuer':
+ self.issuerKey = pykey.keyFromSpecification(value)
+ else:
+ raise UnknownKeyTargetError(subjectOrIssuer)
+
+ def addExtension(self, extensionType, extensionValue, critical):
+ if not self.extensions:
+ self.extensions = []
+ encapsulated = univ.OctetString(encoder.encode(extensionValue))
+ extension = rfc2459.Extension()
+ extension.setComponentByName('extnID', extensionType)
+ # critical is either the string '[critical]' or None.
+ # We only care whether or not it is truthy.
+ if critical:
+ extension.setComponentByName('critical', True)
+ extension.setComponentByName('extnValue', encapsulated)
+ self.extensions.append(extension)
+
+ def addBasicConstraints(self, basicConstraints, critical):
+ cA = basicConstraints.split(',')[0]
+ pathLenConstraint = basicConstraints.split(',')[1]
+ basicConstraintsExtension = rfc2459.BasicConstraints()
+ basicConstraintsExtension.setComponentByName('cA', cA == 'cA')
+ if pathLenConstraint:
+ pathLenConstraintValue = \
+ univ.Integer(int(pathLenConstraint)).subtype(
+ subtypeSpec=constraint.ValueRangeConstraint(0, 64))
+ basicConstraintsExtension.setComponentByName('pathLenConstraint',
+ pathLenConstraintValue)
+ self.addExtension(rfc2459.id_ce_basicConstraints, basicConstraintsExtension, critical)
+
+ def addKeyUsage(self, keyUsage, critical):
+ keyUsageExtension = rfc2459.KeyUsage(keyUsage)
+ self.addExtension(rfc2459.id_ce_keyUsage, keyUsageExtension, critical)
+
+ def keyPurposeToOID(self, keyPurpose):
+ if keyPurpose == 'serverAuth':
+ # the OID for id_kp_serverAuth is incorrect in the
+ # pyasn1-modules implementation
+ return univ.ObjectIdentifier('1.3.6.1.5.5.7.3.1')
+ if keyPurpose == 'clientAuth':
+ return rfc2459.id_kp_clientAuth
+ if keyPurpose == 'codeSigning':
+ return rfc2459.id_kp_codeSigning
+ if keyPurpose == 'emailProtection':
+ return rfc2459.id_kp_emailProtection
+ if keyPurpose == 'nsSGC':
+ return univ.ObjectIdentifier('2.16.840.1.113730.4.1')
+ if keyPurpose == 'OCSPSigning':
+ return univ.ObjectIdentifier('1.3.6.1.5.5.7.3.9')
+ if keyPurpose == 'timeStamping':
+ return rfc2459.id_kp_timeStamping
+ raise UnknownKeyPurposeTypeError(keyPurpose)
+
+ def addExtKeyUsage(self, extKeyUsage, critical):
+ extKeyUsageExtension = rfc2459.ExtKeyUsageSyntax()
+ for count, keyPurpose in enumerate(extKeyUsage.split(',')):
+ extKeyUsageExtension.setComponentByPosition(count, self.keyPurposeToOID(keyPurpose))
+ self.addExtension(rfc2459.id_ce_extKeyUsage, extKeyUsageExtension, critical)
+
+ def addSubjectAlternativeName(self, names, critical):
+ subjectAlternativeName = rfc2459.SubjectAltName()
+ for count, name in enumerate(names.split(',')):
+ generalName = rfc2459.GeneralName()
+ if '/' in name:
+ directoryName = stringToDN(name,
+ tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 4))
+ generalName.setComponentByName('directoryName', directoryName)
+ else:
+ # The string may have things like '\0' (i.e. a slash
+ # followed by the number zero) that have to be decoded into
+ # the resulting '\x00' (i.e. a byte with value zero).
+ generalName.setComponentByName('dNSName', name.decode(encoding='string_escape'))
+ subjectAlternativeName.setComponentByPosition(count, generalName)
+ self.addExtension(rfc2459.id_ce_subjectAltName, subjectAlternativeName, critical)
+
+ def addAuthorityInformationAccess(self, ocspURI, critical):
+ sequence = univ.Sequence()
+ accessDescription = stringToAccessDescription(ocspURI)
+ sequence.setComponentByPosition(0, accessDescription)
+ self.addExtension(rfc2459.id_pe_authorityInfoAccess, sequence, critical)
+
+ def addCertificatePolicies(self, policyOIDs, critical):
+ policies = rfc2459.CertificatePolicies()
+ for pos, policyOID in enumerate(policyOIDs.split(',')):
+ if policyOID == 'any':
+ policyOID = '2.5.29.32.0'
+ policy = rfc2459.PolicyInformation()
+ policyIdentifier = rfc2459.CertPolicyId(policyOID)
+ policy.setComponentByName('policyIdentifier', policyIdentifier)
+ policies.setComponentByPosition(pos, policy)
+ self.addExtension(rfc2459.id_ce_certificatePolicies, policies, critical)
+
+ def addNameConstraints(self, constraints, critical):
+ nameConstraints = NameConstraints()
+ if constraints.startswith('permitted:'):
+ (subtreesType, subtreesTag) = ('permittedSubtrees', 0)
+ elif constraints.startswith('excluded:'):
+ (subtreesType, subtreesTag) = ('excludedSubtrees', 1)
+ else:
+ raise UnknownNameConstraintsSpecificationError(constraints)
+ generalSubtrees = rfc2459.GeneralSubtrees().subtype(
+ implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, subtreesTag))
+ subtrees = constraints[(constraints.find(':') + 1):]
+ for pos, name in enumerate(subtrees.split(',')):
+ generalName = rfc2459.GeneralName()
+ if '/' in name:
+ directoryName = stringToDN(name,
+ tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 4))
+ generalName.setComponentByName('directoryName', directoryName)
+ else:
+ generalName.setComponentByName('dNSName', name)
+ generalSubtree = GeneralSubtree()
+ generalSubtree.setComponentByName('base', generalName)
+ generalSubtrees.setComponentByPosition(pos, generalSubtree)
+ nameConstraints.setComponentByName(subtreesType, generalSubtrees)
+ self.addExtension(rfc2459.id_ce_nameConstraints, nameConstraints, critical)
+
+ def addNSCertType(self, certType, critical):
+ if certType != 'sslServer':
+ raise UnknownNSCertTypeError(certType)
+ self.addExtension(univ.ObjectIdentifier('2.16.840.1.113730.1.1'), univ.BitString("'01'B"),
+ critical)
+
+ def addTLSFeature(self, features, critical):
+ namedFeatures = {'OCSPMustStaple': 5}
+ featureList = [f.strip() for f in features.split(',')]
+ sequence = univ.Sequence()
+ for feature in featureList:
+ featureValue = 0
+ try:
+ featureValue = int(feature)
+ except ValueError:
+ try:
+ featureValue = namedFeatures[feature]
+ except:
+ raise UnknownTLSFeature(feature)
+ sequence.setComponentByPosition(len(sequence),
+ univ.Integer(featureValue))
+ self.addExtension(univ.ObjectIdentifier('1.3.6.1.5.5.7.1.24'), sequence,
+ critical)
+
+ def getVersion(self):
+ return rfc2459.Version(self.versionValue).subtype(
+ explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0))
+
+ def getSerialNumber(self):
+ return decoder.decode(self.serialNumber)[0]
+
+ def getIssuer(self):
+ return stringToDN(self.issuer)
+
+ def getValidity(self):
+ validity = rfc2459.Validity()
+ validity.setComponentByName('notBefore', self.getNotBefore())
+ validity.setComponentByName('notAfter', self.getNotAfter())
+ return validity
+
+ def getNotBefore(self):
+ return datetimeToTime(self.notBefore)
+
+ def getNotAfter(self):
+ return datetimeToTime(self.notAfter)
+
+ def getSubject(self):
+ return stringToDN(self.subject)
+
+ def toDER(self):
+ (signatureOID, hashName) = stringToAlgorithmIdentifiers(self.signature)
+ tbsCertificate = rfc2459.TBSCertificate()
+ tbsCertificate.setComponentByName('version', self.getVersion())
+ tbsCertificate.setComponentByName('serialNumber', self.getSerialNumber())
+ tbsCertificate.setComponentByName('signature', signatureOID)
+ tbsCertificate.setComponentByName('issuer', self.getIssuer())
+ tbsCertificate.setComponentByName('validity', self.getValidity())
+ tbsCertificate.setComponentByName('subject', self.getSubject())
+ tbsCertificate.setComponentByName('subjectPublicKeyInfo',
+ self.subjectKey.asSubjectPublicKeyInfo())
+ if self.extensions:
+ extensions = rfc2459.Extensions().subtype(
+ explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 3))
+ for count, extension in enumerate(self.extensions):
+ extensions.setComponentByPosition(count, extension)
+ tbsCertificate.setComponentByName('extensions', extensions)
+ certificate = rfc2459.Certificate()
+ certificate.setComponentByName('tbsCertificate', tbsCertificate)
+ certificate.setComponentByName('signatureAlgorithm', signatureOID)
+ tbsDER = encoder.encode(tbsCertificate)
+ certificate.setComponentByName('signatureValue', self.issuerKey.sign(tbsDER, hashName))
+ return encoder.encode(certificate)
+
+ def toPEM(self):
+ output = '-----BEGIN CERTIFICATE-----'
+ der = self.toDER()
+ b64 = base64.b64encode(der)
+ while b64:
+ output += '\n' + b64[:64]
+ b64 = b64[64:]
+ output += '\n-----END CERTIFICATE-----'
+ return output
+
+
+# The build harness will call this function with an output
+# file-like object and a path to a file containing a
+# specification. This will read the specification and output
+# the certificate as PEM.
+# This utility tries as hard as possible to ensure that two
+# runs with the same input will have the same output. This is
+# particularly important when building on OS X, where we
+# generate everything twice for unified builds. During the
+# unification step, if any pair of input files differ, the build
+# system throws an error.
+# The one concrete failure mode is if one run happens before
+# midnight on New Year's Eve and the next run happens after
+# midnight.
+def main(output, inputPath):
+ with open(inputPath) as configStream:
+ output.write(Certificate(configStream).toPEM())
+
+# When run as a standalone program, this will read a specification from
+# stdin and output the certificate as PEM to stdout.
+if __name__ == '__main__':
+ print Certificate(sys.stdin).toPEM()