1#!/usr/bin/env python 2# 3# This Source Code Form is subject to the terms of the Mozilla Public 4# License, v. 2.0. If a copy of the MPL was not distributed with this 5# file, You can obtain one at http://mozilla.org/MPL/2.0/. 6 7""" 8Reads a specification from stdin and outputs a PKCS7 (CMS) message with 9the desired properties. 10 11The specification format is as follows: 12 13sha1:<hex string> 14sha256:<hex string> 15signer: 16<pycert specification> 17 18Eith or both of sha1 and sha256 may be specified. The value of 19each hash directive is what will be put in the messageDigest 20attribute of the SignerInfo that corresponds to the signature 21algorithm defined by the hash algorithm and key type of the 22default key. Together, these comprise the signerInfos field of 23the SignedData. If neither hash is specified, the signerInfos 24will be an empty SET (i.e. there will be no actual signature 25information). 26The certificate specification must come last. 27""" 28 29from pyasn1.codec.der import decoder 30from pyasn1.codec.der import encoder 31from pyasn1.type import tag, univ 32from pyasn1_modules import rfc2315, rfc2459 33import StringIO 34import base64 35import pycert 36import pykey 37import sys 38 39class Error(Exception): 40 """Base class for exceptions in this module.""" 41 pass 42 43 44class UnknownDirectiveError(Error): 45 """Helper exception type to handle unknown specification 46 directives.""" 47 48 def __init__(self, directive): 49 super(UnknownDirectiveError, self).__init__() 50 self.directive = directive 51 52 def __str__(self): 53 return 'Unknown directive %s' % repr(self.directive) 54 55 56class CMS(object): 57 """Utility class for reading a CMS specification and 58 generating a CMS message""" 59 60 def __init__(self, paramStream): 61 self.sha1 = '' 62 self.sha256 = '' 63 signerSpecification = StringIO.StringIO() 64 readingSignerSpecification = False 65 for line in paramStream.readlines(): 66 if readingSignerSpecification: 67 print >>signerSpecification, line.strip() 68 elif line.strip() == 'signer:': 69 readingSignerSpecification = True 70 elif line.startswith('sha1:'): 71 self.sha1 = line.strip()[len('sha1:'):] 72 elif line.startswith('sha256:'): 73 self.sha256 = line.strip()[len('sha256:'):] 74 else: 75 raise UnknownDirectiveError(line.strip()) 76 signerSpecification.seek(0) 77 self.signer = pycert.Certificate(signerSpecification) 78 self.signingKey = pykey.keyFromSpecification('default') 79 80 def buildAuthenticatedAttributes(self, value, implicitTag=None): 81 """Utility function to build a pyasn1 AuthenticatedAttributes 82 object. Useful because when building a SignerInfo, the 83 authenticatedAttributes needs to be tagged implicitly, but when 84 signing an AuthenticatedAttributes, it needs the explicit SET 85 tag.""" 86 if implicitTag: 87 authenticatedAttributes = rfc2315.Attributes().subtype(implicitTag=implicitTag) 88 else: 89 authenticatedAttributes = rfc2315.Attributes() 90 contentTypeAttribute = rfc2315.Attribute() 91 # PKCS#9 contentType 92 contentTypeAttribute['type'] = univ.ObjectIdentifier('1.2.840.113549.1.9.3') 93 contentTypeAttribute['values'] = univ.SetOf(rfc2459.AttributeValue()) 94 # PKCS#7 data 95 contentTypeAttribute['values'][0] = univ.ObjectIdentifier('1.2.840.113549.1.7.1') 96 authenticatedAttributes[0] = contentTypeAttribute 97 hashAttribute = rfc2315.Attribute() 98 # PKCS#9 messageDigest 99 hashAttribute['type'] = univ.ObjectIdentifier('1.2.840.113549.1.9.4') 100 hashAttribute['values'] = univ.SetOf(rfc2459.AttributeValue()) 101 hashAttribute['values'][0] = univ.OctetString(hexValue=value) 102 authenticatedAttributes[1] = hashAttribute 103 return authenticatedAttributes 104 105 def pykeyHashToDigestAlgorithm(self, pykeyHash): 106 """Given a pykey hash algorithm identifier, builds an 107 AlgorithmIdentifier for use with pyasn1.""" 108 if pykeyHash == pykey.HASH_SHA1: 109 oidString = '1.3.14.3.2.26' 110 elif pykeyHash == pykey.HASH_SHA256: 111 oidString = '2.16.840.1.101.3.4.2.1' 112 else: 113 raise pykey.UnknownHashAlgorithmError(pykeyHash) 114 algorithmIdentifier = rfc2459.AlgorithmIdentifier() 115 algorithmIdentifier['algorithm'] = univ.ObjectIdentifier(oidString) 116 # Directly setting parameters to univ.Null doesn't currently work. 117 nullEncapsulated = encoder.encode(univ.Null()) 118 algorithmIdentifier['parameters'] = univ.Any(nullEncapsulated) 119 return algorithmIdentifier 120 121 def buildSignerInfo(self, certificate, pykeyHash, digestValue): 122 """Given a pyasn1 certificate, a pykey hash identifier 123 and a hash value, creates a SignerInfo with the 124 appropriate values.""" 125 signerInfo = rfc2315.SignerInfo() 126 signerInfo['version'] = 1 127 issuerAndSerialNumber = rfc2315.IssuerAndSerialNumber() 128 issuerAndSerialNumber['issuer'] = self.signer.getIssuer() 129 issuerAndSerialNumber['serialNumber'] = certificate['tbsCertificate']['serialNumber'] 130 signerInfo['issuerAndSerialNumber'] = issuerAndSerialNumber 131 signerInfo['digestAlgorithm'] = self.pykeyHashToDigestAlgorithm(pykeyHash) 132 rsa = rfc2459.AlgorithmIdentifier() 133 rsa['algorithm'] = rfc2459.rsaEncryption 134 rsa['parameters'] = univ.Null() 135 authenticatedAttributes = self.buildAuthenticatedAttributes(digestValue, 136 implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0)) 137 authenticatedAttributesTBS = self.buildAuthenticatedAttributes(digestValue) 138 signerInfo['authenticatedAttributes'] = authenticatedAttributes 139 signerInfo['digestEncryptionAlgorithm'] = rsa 140 authenticatedAttributesEncoded = encoder.encode(authenticatedAttributesTBS) 141 signature = self.signingKey.sign(authenticatedAttributesEncoded, pykeyHash) 142 # signature will be a hexified bit string of the form 143 # "'<hex bytes>'H". For some reason that's what BitString wants, 144 # but since this is an OCTET STRING, we have to strip off the 145 # quotation marks and trailing "H". 146 signerInfo['encryptedDigest'] = univ.OctetString(hexValue=signature[1:-2]) 147 return signerInfo 148 149 def toDER(self): 150 contentInfo = rfc2315.ContentInfo() 151 contentInfo['contentType'] = rfc2315.signedData 152 153 signedData = rfc2315.SignedData() 154 signedData['version'] = rfc2315.Version(1) 155 156 digestAlgorithms = rfc2315.DigestAlgorithmIdentifiers() 157 digestAlgorithms[0] = self.pykeyHashToDigestAlgorithm(pykey.HASH_SHA1) 158 signedData['digestAlgorithms'] = digestAlgorithms 159 160 dataContentInfo = rfc2315.ContentInfo() 161 dataContentInfo['contentType'] = rfc2315.data 162 signedData['contentInfo'] = dataContentInfo 163 164 certificates = rfc2315.ExtendedCertificatesAndCertificates().subtype( 165 implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0)) 166 extendedCertificateOrCertificate = rfc2315.ExtendedCertificateOrCertificate() 167 certificate = decoder.decode(self.signer.toDER(), 168 asn1Spec=rfc2459.Certificate())[0] 169 extendedCertificateOrCertificate['certificate'] = certificate 170 certificates[0] = extendedCertificateOrCertificate 171 signedData['certificates'] = certificates 172 173 signerInfos = rfc2315.SignerInfos() 174 175 if len(self.sha1) > 0: 176 signerInfos[len(signerInfos)] = self.buildSignerInfo(certificate, 177 pykey.HASH_SHA1, self.sha1) 178 if len(self.sha256) > 0: 179 signerInfos[len(signerInfos)] = self.buildSignerInfo(certificate, 180 pykey.HASH_SHA256, self.sha256) 181 signedData['signerInfos'] = signerInfos 182 183 encoded = encoder.encode(signedData) 184 anyTag = univ.Any(encoded).subtype( 185 explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0)) 186 187 contentInfo['content'] = anyTag 188 return encoder.encode(contentInfo) 189 190 def toPEM(self): 191 output = '-----BEGIN PKCS7-----' 192 der = self.toDER() 193 b64 = base64.b64encode(der) 194 while b64: 195 output += '\n' + b64[:64] 196 b64 = b64[64:] 197 output += '\n-----END PKCS7-----\n' 198 return output 199 200 201# When run as a standalone program, this will read a specification from 202# stdin and output the certificate as PEM to stdout. 203if __name__ == '__main__': 204 print CMS(sys.stdin).toPEM() 205