1#!/usr/bin/env python
2#
3# This Source Code Form is subject to the terms of the Mozilla Public
4# License, v. 2.0. If a copy of the MPL was not distributed with this
5# file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7"""
8Reads a specification from stdin and outputs a PKCS7 (CMS) message with
9the desired properties.
10
11The specification format is as follows:
12
13sha1:<hex string>
14sha256:<hex string>
15signer:
16<pycert specification>
17
18Eith or both of sha1 and sha256 may be specified. The value of
19each hash directive is what will be put in the messageDigest
20attribute of the SignerInfo that corresponds to the signature
21algorithm defined by the hash algorithm and key type of the
22default key. Together, these comprise the signerInfos field of
23the SignedData. If neither hash is specified, the signerInfos
24will be an empty SET (i.e. there will be no actual signature
25information).
26The certificate specification must come last.
27"""
28
29from pyasn1.codec.der import decoder
30from pyasn1.codec.der import encoder
31from pyasn1.type import tag, univ
32from pyasn1_modules import rfc2315, rfc2459
33import StringIO
34import base64
35import pycert
36import pykey
37import sys
38
39class Error(Exception):
40    """Base class for exceptions in this module."""
41    pass
42
43
44class UnknownDirectiveError(Error):
45    """Helper exception type to handle unknown specification
46    directives."""
47
48    def __init__(self, directive):
49        super(UnknownDirectiveError, self).__init__()
50        self.directive = directive
51
52    def __str__(self):
53        return 'Unknown directive %s' % repr(self.directive)
54
55
56class CMS(object):
57    """Utility class for reading a CMS specification and
58    generating a CMS message"""
59
60    def __init__(self, paramStream):
61        self.sha1 = ''
62        self.sha256 = ''
63        signerSpecification = StringIO.StringIO()
64        readingSignerSpecification = False
65        for line in paramStream.readlines():
66            if readingSignerSpecification:
67                print >>signerSpecification, line.strip()
68            elif line.strip() == 'signer:':
69                readingSignerSpecification = True
70            elif line.startswith('sha1:'):
71                self.sha1 = line.strip()[len('sha1:'):]
72            elif line.startswith('sha256:'):
73                self.sha256 = line.strip()[len('sha256:'):]
74            else:
75                raise UnknownDirectiveError(line.strip())
76        signerSpecification.seek(0)
77        self.signer = pycert.Certificate(signerSpecification)
78        self.signingKey = pykey.keyFromSpecification('default')
79
80    def buildAuthenticatedAttributes(self, value, implicitTag=None):
81        """Utility function to build a pyasn1 AuthenticatedAttributes
82        object. Useful because when building a SignerInfo, the
83        authenticatedAttributes needs to be tagged implicitly, but when
84        signing an AuthenticatedAttributes, it needs the explicit SET
85        tag."""
86        if implicitTag:
87            authenticatedAttributes = rfc2315.Attributes().subtype(implicitTag=implicitTag)
88        else:
89            authenticatedAttributes = rfc2315.Attributes()
90        contentTypeAttribute = rfc2315.Attribute()
91        # PKCS#9 contentType
92        contentTypeAttribute['type'] = univ.ObjectIdentifier('1.2.840.113549.1.9.3')
93        contentTypeAttribute['values'] = univ.SetOf(rfc2459.AttributeValue())
94        # PKCS#7 data
95        contentTypeAttribute['values'][0] = univ.ObjectIdentifier('1.2.840.113549.1.7.1')
96        authenticatedAttributes[0] = contentTypeAttribute
97        hashAttribute = rfc2315.Attribute()
98        # PKCS#9 messageDigest
99        hashAttribute['type'] = univ.ObjectIdentifier('1.2.840.113549.1.9.4')
100        hashAttribute['values'] = univ.SetOf(rfc2459.AttributeValue())
101        hashAttribute['values'][0] = univ.OctetString(hexValue=value)
102        authenticatedAttributes[1] = hashAttribute
103        return authenticatedAttributes
104
105    def pykeyHashToDigestAlgorithm(self, pykeyHash):
106        """Given a pykey hash algorithm identifier, builds an
107        AlgorithmIdentifier for use with pyasn1."""
108        if pykeyHash == pykey.HASH_SHA1:
109            oidString = '1.3.14.3.2.26'
110        elif pykeyHash == pykey.HASH_SHA256:
111            oidString = '2.16.840.1.101.3.4.2.1'
112        else:
113            raise pykey.UnknownHashAlgorithmError(pykeyHash)
114        algorithmIdentifier = rfc2459.AlgorithmIdentifier()
115        algorithmIdentifier['algorithm'] = univ.ObjectIdentifier(oidString)
116        # Directly setting parameters to univ.Null doesn't currently work.
117        nullEncapsulated = encoder.encode(univ.Null())
118        algorithmIdentifier['parameters'] = univ.Any(nullEncapsulated)
119        return algorithmIdentifier
120
121    def buildSignerInfo(self, certificate, pykeyHash, digestValue):
122        """Given a pyasn1 certificate, a pykey hash identifier
123        and a hash value, creates a SignerInfo with the
124        appropriate values."""
125        signerInfo = rfc2315.SignerInfo()
126        signerInfo['version'] = 1
127        issuerAndSerialNumber = rfc2315.IssuerAndSerialNumber()
128        issuerAndSerialNumber['issuer'] = self.signer.getIssuer()
129        issuerAndSerialNumber['serialNumber'] = certificate['tbsCertificate']['serialNumber']
130        signerInfo['issuerAndSerialNumber'] = issuerAndSerialNumber
131        signerInfo['digestAlgorithm'] = self.pykeyHashToDigestAlgorithm(pykeyHash)
132        rsa = rfc2459.AlgorithmIdentifier()
133        rsa['algorithm'] = rfc2459.rsaEncryption
134        rsa['parameters'] = univ.Null()
135        authenticatedAttributes = self.buildAuthenticatedAttributes(digestValue,
136          implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0))
137        authenticatedAttributesTBS = self.buildAuthenticatedAttributes(digestValue)
138        signerInfo['authenticatedAttributes'] = authenticatedAttributes
139        signerInfo['digestEncryptionAlgorithm'] = rsa
140        authenticatedAttributesEncoded = encoder.encode(authenticatedAttributesTBS)
141        signature = self.signingKey.sign(authenticatedAttributesEncoded, pykeyHash)
142        # signature will be a hexified bit string of the form
143        # "'<hex bytes>'H". For some reason that's what BitString wants,
144        # but since this is an OCTET STRING, we have to strip off the
145        # quotation marks and trailing "H".
146        signerInfo['encryptedDigest'] = univ.OctetString(hexValue=signature[1:-2])
147        return signerInfo
148
149    def toDER(self):
150        contentInfo = rfc2315.ContentInfo()
151        contentInfo['contentType'] = rfc2315.signedData
152
153        signedData = rfc2315.SignedData()
154        signedData['version'] = rfc2315.Version(1)
155
156        digestAlgorithms = rfc2315.DigestAlgorithmIdentifiers()
157        digestAlgorithms[0] = self.pykeyHashToDigestAlgorithm(pykey.HASH_SHA1)
158        signedData['digestAlgorithms'] = digestAlgorithms
159
160        dataContentInfo = rfc2315.ContentInfo()
161        dataContentInfo['contentType'] = rfc2315.data
162        signedData['contentInfo'] = dataContentInfo
163
164        certificates = rfc2315.ExtendedCertificatesAndCertificates().subtype(
165            implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0))
166        extendedCertificateOrCertificate = rfc2315.ExtendedCertificateOrCertificate()
167        certificate = decoder.decode(self.signer.toDER(),
168            asn1Spec=rfc2459.Certificate())[0]
169        extendedCertificateOrCertificate['certificate'] = certificate
170        certificates[0] = extendedCertificateOrCertificate
171        signedData['certificates'] = certificates
172
173        signerInfos = rfc2315.SignerInfos()
174
175        if len(self.sha1) > 0:
176            signerInfos[len(signerInfos)] = self.buildSignerInfo(certificate,
177                pykey.HASH_SHA1, self.sha1)
178        if len(self.sha256) > 0:
179            signerInfos[len(signerInfos)] = self.buildSignerInfo(certificate,
180                pykey.HASH_SHA256, self.sha256)
181        signedData['signerInfos'] = signerInfos
182
183        encoded = encoder.encode(signedData)
184        anyTag = univ.Any(encoded).subtype(
185            explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0))
186
187        contentInfo['content'] = anyTag
188        return encoder.encode(contentInfo)
189
190    def toPEM(self):
191        output = '-----BEGIN PKCS7-----'
192        der = self.toDER()
193        b64 = base64.b64encode(der)
194        while b64:
195            output += '\n' + b64[:64]
196            b64 = b64[64:]
197        output += '\n-----END PKCS7-----\n'
198        return output
199
200
201# When run as a standalone program, this will read a specification from
202# stdin and output the certificate as PEM to stdout.
203if __name__ == '__main__':
204    print CMS(sys.stdin).toPEM()
205