1# This file is dual licensed under the terms of the Apache License, Version
2# 2.0, and the BSD License. See the LICENSE file in the root of this repository
3# for complete details.
4
5from __future__ import absolute_import, division, print_function
6
7import os
8
9import pytest
10
11from cryptography import x509
12from cryptography.exceptions import _Reasons
13from cryptography.hazmat.primitives import hashes, serialization
14from cryptography.hazmat.primitives.asymmetric import ed25519
15from cryptography.hazmat.primitives.serialization import pkcs7
16
17from .utils import load_vectors_from_file
18from ...utils import raises_unsupported_algorithm
19
20
21class TestPKCS7Loading(object):
22    def test_load_invalid_der_pkcs7(self):
23        with pytest.raises(ValueError):
24            pkcs7.load_der_pkcs7_certificates(b"nonsense")
25
26    def test_load_invalid_pem_pkcs7(self):
27        with pytest.raises(ValueError):
28            pkcs7.load_pem_pkcs7_certificates(b"nonsense")
29
30    def test_not_bytes_der(self):
31        with pytest.raises(TypeError):
32            pkcs7.load_der_pkcs7_certificates(38)
33
34    def test_not_bytes_pem(self):
35        with pytest.raises(TypeError):
36            pkcs7.load_pem_pkcs7_certificates(38)
37
38    def test_load_pkcs7_pem(self):
39        certs = load_vectors_from_file(
40            os.path.join("pkcs7", "isrg.pem"),
41            lambda pemfile: pkcs7.load_pem_pkcs7_certificates(pemfile.read()),
42            mode="rb",
43        )
44        assert len(certs) == 1
45        assert certs[0].subject.get_attributes_for_oid(
46            x509.oid.NameOID.COMMON_NAME
47        ) == [
48            x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, u"ISRG Root X1")
49        ]
50
51    def test_load_pkcs7_der(self):
52        certs = load_vectors_from_file(
53            os.path.join("pkcs7", "amazon-roots.p7b"),
54            lambda derfile: pkcs7.load_der_pkcs7_certificates(derfile.read()),
55            mode="rb",
56        )
57        assert len(certs) == 2
58        assert certs[0].subject.get_attributes_for_oid(
59            x509.oid.NameOID.COMMON_NAME
60        ) == [
61            x509.NameAttribute(
62                x509.oid.NameOID.COMMON_NAME, u"Amazon Root CA 3"
63            )
64        ]
65        assert certs[1].subject.get_attributes_for_oid(
66            x509.oid.NameOID.COMMON_NAME
67        ) == [
68            x509.NameAttribute(
69                x509.oid.NameOID.COMMON_NAME, u"Amazon Root CA 2"
70            )
71        ]
72
73    def test_load_pkcs7_unsupported_type(self):
74        with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_SERIALIZATION):
75            load_vectors_from_file(
76                os.path.join("pkcs7", "enveloped.pem"),
77                lambda pemfile: pkcs7.load_pem_pkcs7_certificates(
78                    pemfile.read()
79                ),
80                mode="rb",
81            )
82
83
84# We have no public verification API and won't be adding one until we get
85# some requirements from users so this function exists to give us basic
86# verification for the signing tests.
87def _pkcs7_verify(encoding, sig, msg, certs, options, backend):
88    sig_bio = backend._bytes_to_bio(sig)
89    if encoding is serialization.Encoding.DER:
90        p7 = backend._lib.d2i_PKCS7_bio(sig_bio.bio, backend._ffi.NULL)
91    elif encoding is serialization.Encoding.PEM:
92        p7 = backend._lib.PEM_read_bio_PKCS7(
93            sig_bio.bio,
94            backend._ffi.NULL,
95            backend._ffi.NULL,
96            backend._ffi.NULL,
97        )
98    else:
99        p7 = backend._lib.SMIME_read_PKCS7(sig_bio.bio, backend._ffi.NULL)
100    backend.openssl_assert(p7 != backend._ffi.NULL)
101    p7 = backend._ffi.gc(p7, backend._lib.PKCS7_free)
102    flags = 0
103    for option in options:
104        if option is pkcs7.PKCS7Options.Text:
105            flags |= backend._lib.PKCS7_TEXT
106    store = backend._lib.X509_STORE_new()
107    backend.openssl_assert(store != backend._ffi.NULL)
108    store = backend._ffi.gc(store, backend._lib.X509_STORE_free)
109    for cert in certs:
110        res = backend._lib.X509_STORE_add_cert(store, cert._x509)
111        backend.openssl_assert(res == 1)
112    if msg is None:
113        res = backend._lib.PKCS7_verify(
114            p7,
115            backend._ffi.NULL,
116            store,
117            backend._ffi.NULL,
118            backend._ffi.NULL,
119            flags,
120        )
121    else:
122        msg_bio = backend._bytes_to_bio(msg)
123        res = backend._lib.PKCS7_verify(
124            p7, backend._ffi.NULL, store, msg_bio.bio, backend._ffi.NULL, flags
125        )
126    backend.openssl_assert(res == 1)
127
128
129def _load_cert_key():
130    key = load_vectors_from_file(
131        os.path.join("x509", "custom", "ca", "ca_key.pem"),
132        lambda pemfile: serialization.load_pem_private_key(
133            pemfile.read(), None
134        ),
135        mode="rb",
136    )
137    cert = load_vectors_from_file(
138        os.path.join("x509", "custom", "ca", "ca.pem"),
139        loader=lambda pemfile: x509.load_pem_x509_certificate(pemfile.read()),
140        mode="rb",
141    )
142    return cert, key
143
144
145class TestPKCS7Builder(object):
146    def test_invalid_data(self):
147        builder = pkcs7.PKCS7SignatureBuilder()
148        with pytest.raises(TypeError):
149            builder.set_data(u"not bytes")
150
151    def test_set_data_twice(self):
152        builder = pkcs7.PKCS7SignatureBuilder().set_data(b"test")
153        with pytest.raises(ValueError):
154            builder.set_data(b"test")
155
156    def test_sign_no_signer(self):
157        builder = pkcs7.PKCS7SignatureBuilder().set_data(b"test")
158        with pytest.raises(ValueError):
159            builder.sign(serialization.Encoding.SMIME, [])
160
161    def test_sign_no_data(self):
162        cert, key = _load_cert_key()
163        builder = pkcs7.PKCS7SignatureBuilder().add_signer(
164            cert, key, hashes.SHA256()
165        )
166        with pytest.raises(ValueError):
167            builder.sign(serialization.Encoding.SMIME, [])
168
169    def test_unsupported_hash_alg(self):
170        cert, key = _load_cert_key()
171        with pytest.raises(TypeError):
172            pkcs7.PKCS7SignatureBuilder().add_signer(
173                cert, key, hashes.SHA512_256()
174            )
175
176    def test_not_a_cert(self):
177        cert, key = _load_cert_key()
178        with pytest.raises(TypeError):
179            pkcs7.PKCS7SignatureBuilder().add_signer(
180                b"notacert", key, hashes.SHA256()
181            )
182
183    @pytest.mark.supported(
184        only_if=lambda backend: backend.ed25519_supported(),
185        skip_message="Does not support ed25519.",
186    )
187    def test_unsupported_key_type(self, backend):
188        cert, _ = _load_cert_key()
189        key = ed25519.Ed25519PrivateKey.generate()
190        with pytest.raises(TypeError):
191            pkcs7.PKCS7SignatureBuilder().add_signer(
192                cert, key, hashes.SHA256()
193            )
194
195    def test_sign_invalid_options(self):
196        cert, key = _load_cert_key()
197        builder = (
198            pkcs7.PKCS7SignatureBuilder()
199            .set_data(b"test")
200            .add_signer(cert, key, hashes.SHA256())
201        )
202        with pytest.raises(ValueError):
203            builder.sign(serialization.Encoding.SMIME, [b"invalid"])
204
205    def test_sign_invalid_encoding(self):
206        cert, key = _load_cert_key()
207        builder = (
208            pkcs7.PKCS7SignatureBuilder()
209            .set_data(b"test")
210            .add_signer(cert, key, hashes.SHA256())
211        )
212        with pytest.raises(ValueError):
213            builder.sign(serialization.Encoding.Raw, [])
214
215    def test_sign_invalid_options_text_no_detached(self):
216        cert, key = _load_cert_key()
217        builder = (
218            pkcs7.PKCS7SignatureBuilder()
219            .set_data(b"test")
220            .add_signer(cert, key, hashes.SHA256())
221        )
222        options = [pkcs7.PKCS7Options.Text]
223        with pytest.raises(ValueError):
224            builder.sign(serialization.Encoding.SMIME, options)
225
226    def test_sign_invalid_options_text_der_encoding(self):
227        cert, key = _load_cert_key()
228        builder = (
229            pkcs7.PKCS7SignatureBuilder()
230            .set_data(b"test")
231            .add_signer(cert, key, hashes.SHA256())
232        )
233        options = [
234            pkcs7.PKCS7Options.Text,
235            pkcs7.PKCS7Options.DetachedSignature,
236        ]
237        with pytest.raises(ValueError):
238            builder.sign(serialization.Encoding.DER, options)
239
240    def test_sign_invalid_options_no_attrs_and_no_caps(self):
241        cert, key = _load_cert_key()
242        builder = (
243            pkcs7.PKCS7SignatureBuilder()
244            .set_data(b"test")
245            .add_signer(cert, key, hashes.SHA256())
246        )
247        options = [
248            pkcs7.PKCS7Options.NoAttributes,
249            pkcs7.PKCS7Options.NoCapabilities,
250        ]
251        with pytest.raises(ValueError):
252            builder.sign(serialization.Encoding.SMIME, options)
253
254    def test_smime_sign_detached(self, backend):
255        data = b"hello world"
256        cert, key = _load_cert_key()
257        options = [pkcs7.PKCS7Options.DetachedSignature]
258        builder = (
259            pkcs7.PKCS7SignatureBuilder()
260            .set_data(data)
261            .add_signer(cert, key, hashes.SHA256())
262        )
263
264        sig = builder.sign(serialization.Encoding.SMIME, options)
265        sig_binary = builder.sign(serialization.Encoding.DER, options)
266        # We don't have a generic ASN.1 parser available to us so we instead
267        # will assert on specific byte sequences being present based on the
268        # parameters chosen above.
269        assert b"sha-256" in sig
270        # Detached signature means that the signed data is *not* embedded into
271        # the PKCS7 structure itself, but is present in the SMIME serialization
272        # as a separate section before the PKCS7 data. So we should expect to
273        # have data in sig but not in sig_binary
274        assert data in sig
275        _pkcs7_verify(
276            serialization.Encoding.SMIME, sig, data, [cert], options, backend
277        )
278        assert data not in sig_binary
279        _pkcs7_verify(
280            serialization.Encoding.DER,
281            sig_binary,
282            data,
283            [cert],
284            options,
285            backend,
286        )
287
288    def test_sign_byteslike(self):
289        data = bytearray(b"hello world")
290        cert, key = _load_cert_key()
291        options = [pkcs7.PKCS7Options.DetachedSignature]
292        builder = (
293            pkcs7.PKCS7SignatureBuilder()
294            .set_data(data)
295            .add_signer(cert, key, hashes.SHA256())
296        )
297
298        sig = builder.sign(serialization.Encoding.SMIME, options)
299        assert bytes(data) in sig
300
301    def test_sign_pem(self, backend):
302        data = b"hello world"
303        cert, key = _load_cert_key()
304        options = []
305        builder = (
306            pkcs7.PKCS7SignatureBuilder()
307            .set_data(data)
308            .add_signer(cert, key, hashes.SHA256())
309        )
310
311        sig = builder.sign(serialization.Encoding.PEM, options)
312        _pkcs7_verify(
313            serialization.Encoding.PEM,
314            sig,
315            None,
316            [cert],
317            options,
318            backend,
319        )
320
321    @pytest.mark.parametrize(
322        ("hash_alg", "expected_value"),
323        [
324            (hashes.SHA1(), b"\x06\x05+\x0e\x03\x02\x1a"),
325            (hashes.SHA256(), b"\x06\t`\x86H\x01e\x03\x04\x02\x01"),
326            (hashes.SHA384(), b"\x06\t`\x86H\x01e\x03\x04\x02\x02"),
327            (hashes.SHA512(), b"\x06\t`\x86H\x01e\x03\x04\x02\x03"),
328        ],
329    )
330    def test_sign_alternate_digests_der(
331        self, hash_alg, expected_value, backend
332    ):
333        data = b"hello world"
334        cert, key = _load_cert_key()
335        builder = (
336            pkcs7.PKCS7SignatureBuilder()
337            .set_data(data)
338            .add_signer(cert, key, hash_alg)
339        )
340        options = []
341        sig = builder.sign(serialization.Encoding.DER, options)
342        assert expected_value in sig
343        _pkcs7_verify(
344            serialization.Encoding.DER, sig, None, [cert], options, backend
345        )
346
347    @pytest.mark.parametrize(
348        ("hash_alg", "expected_value"),
349        [
350            (hashes.SHA1(), b"sha1"),
351            (hashes.SHA256(), b"sha-256"),
352            (hashes.SHA384(), b"sha-384"),
353            (hashes.SHA512(), b"sha-512"),
354        ],
355    )
356    def test_sign_alternate_digests_detached(self, hash_alg, expected_value):
357        data = b"hello world"
358        cert, key = _load_cert_key()
359        builder = (
360            pkcs7.PKCS7SignatureBuilder()
361            .set_data(data)
362            .add_signer(cert, key, hash_alg)
363        )
364        options = [pkcs7.PKCS7Options.DetachedSignature]
365        sig = builder.sign(serialization.Encoding.SMIME, options)
366        # When in detached signature mode the hash algorithm is stored as a
367        # byte string like "sha-384".
368        assert expected_value in sig
369
370    def test_sign_attached(self, backend):
371        data = b"hello world"
372        cert, key = _load_cert_key()
373        options = []
374        builder = (
375            pkcs7.PKCS7SignatureBuilder()
376            .set_data(data)
377            .add_signer(cert, key, hashes.SHA256())
378        )
379
380        sig_binary = builder.sign(serialization.Encoding.DER, options)
381        # When not passing detached signature the signed data is embedded into
382        # the PKCS7 structure itself
383        assert data in sig_binary
384        _pkcs7_verify(
385            serialization.Encoding.DER,
386            sig_binary,
387            None,
388            [cert],
389            options,
390            backend,
391        )
392
393    def test_sign_binary(self, backend):
394        data = b"hello\nworld"
395        cert, key = _load_cert_key()
396        builder = (
397            pkcs7.PKCS7SignatureBuilder()
398            .set_data(data)
399            .add_signer(cert, key, hashes.SHA256())
400        )
401        options = []
402        sig_no_binary = builder.sign(serialization.Encoding.DER, options)
403        sig_binary = builder.sign(
404            serialization.Encoding.DER, [pkcs7.PKCS7Options.Binary]
405        )
406        # Binary prevents translation of LF to CR+LF (SMIME canonical form)
407        # so data should not be present in sig_no_binary, but should be present
408        # in sig_binary
409        assert data not in sig_no_binary
410        _pkcs7_verify(
411            serialization.Encoding.DER,
412            sig_no_binary,
413            None,
414            [cert],
415            options,
416            backend,
417        )
418        assert data in sig_binary
419        _pkcs7_verify(
420            serialization.Encoding.DER,
421            sig_binary,
422            None,
423            [cert],
424            options,
425            backend,
426        )
427
428    def test_sign_smime_canonicalization(self, backend):
429        data = b"hello\nworld"
430        cert, key = _load_cert_key()
431        builder = (
432            pkcs7.PKCS7SignatureBuilder()
433            .set_data(data)
434            .add_signer(cert, key, hashes.SHA256())
435        )
436
437        options = []
438        sig_binary = builder.sign(serialization.Encoding.DER, options)
439        # LF gets converted to CR+LF (SMIME canonical form)
440        # so data should not be present in the sig
441        assert data not in sig_binary
442        assert b"hello\r\nworld" in sig_binary
443        _pkcs7_verify(
444            serialization.Encoding.DER,
445            sig_binary,
446            None,
447            [cert],
448            options,
449            backend,
450        )
451
452    def test_sign_text(self, backend):
453        data = b"hello world"
454        cert, key = _load_cert_key()
455        builder = (
456            pkcs7.PKCS7SignatureBuilder()
457            .set_data(data)
458            .add_signer(cert, key, hashes.SHA256())
459        )
460
461        options = [
462            pkcs7.PKCS7Options.Text,
463            pkcs7.PKCS7Options.DetachedSignature,
464        ]
465        sig_pem = builder.sign(serialization.Encoding.SMIME, options)
466        # The text option adds text/plain headers to the S/MIME message
467        # These headers are only relevant in SMIME mode, not binary, which is
468        # just the PKCS7 structure itself.
469        assert b"text/plain" in sig_pem
470        # When passing the Text option the header is prepended so the actual
471        # signed data is this.
472        signed_data = b"Content-Type: text/plain\r\n\r\nhello world"
473        _pkcs7_verify(
474            serialization.Encoding.SMIME,
475            sig_pem,
476            signed_data,
477            [cert],
478            options,
479            backend,
480        )
481
482    def test_sign_no_capabilities(self, backend):
483        data = b"hello world"
484        cert, key = _load_cert_key()
485        builder = (
486            pkcs7.PKCS7SignatureBuilder()
487            .set_data(data)
488            .add_signer(cert, key, hashes.SHA256())
489        )
490
491        options = [pkcs7.PKCS7Options.NoCapabilities]
492        sig_binary = builder.sign(serialization.Encoding.DER, options)
493        # NoCapabilities removes the SMIMECapabilities attribute from the
494        # PKCS7 structure. This is an ASN.1 sequence with the
495        # OID 1.2.840.113549.1.9.15. It does NOT remove all authenticated
496        # attributes, so we verify that by looking for the signingTime OID.
497
498        # 1.2.840.113549.1.9.15 SMIMECapabilities as an ASN.1 DER encoded OID
499        assert b"\x06\t*\x86H\x86\xf7\r\x01\t\x0f" not in sig_binary
500        # 1.2.840.113549.1.9.5 signingTime as an ASN.1 DER encoded OID
501        assert b"\x06\t*\x86H\x86\xf7\r\x01\t\x05" in sig_binary
502        _pkcs7_verify(
503            serialization.Encoding.DER,
504            sig_binary,
505            None,
506            [cert],
507            options,
508            backend,
509        )
510
511    def test_sign_no_attributes(self, backend):
512        data = b"hello world"
513        cert, key = _load_cert_key()
514        builder = (
515            pkcs7.PKCS7SignatureBuilder()
516            .set_data(data)
517            .add_signer(cert, key, hashes.SHA256())
518        )
519
520        options = [pkcs7.PKCS7Options.NoAttributes]
521        sig_binary = builder.sign(serialization.Encoding.DER, options)
522        # NoAttributes removes all authenticated attributes, so we shouldn't
523        # find SMIMECapabilities or signingTime.
524
525        # 1.2.840.113549.1.9.15 SMIMECapabilities as an ASN.1 DER encoded OID
526        assert b"\x06\t*\x86H\x86\xf7\r\x01\t\x0f" not in sig_binary
527        # 1.2.840.113549.1.9.5 signingTime as an ASN.1 DER encoded OID
528        assert b"\x06\t*\x86H\x86\xf7\r\x01\t\x05" not in sig_binary
529        _pkcs7_verify(
530            serialization.Encoding.DER,
531            sig_binary,
532            None,
533            [cert],
534            options,
535            backend,
536        )
537
538    def test_sign_no_certs(self, backend):
539        data = b"hello world"
540        cert, key = _load_cert_key()
541        builder = (
542            pkcs7.PKCS7SignatureBuilder()
543            .set_data(data)
544            .add_signer(cert, key, hashes.SHA256())
545        )
546
547        options = []
548        sig = builder.sign(serialization.Encoding.DER, options)
549        assert sig.count(cert.public_bytes(serialization.Encoding.DER)) == 1
550
551        options = [pkcs7.PKCS7Options.NoCerts]
552        sig_no = builder.sign(serialization.Encoding.DER, options)
553        assert sig_no.count(cert.public_bytes(serialization.Encoding.DER)) == 0
554
555    def test_multiple_signers(self, backend):
556        data = b"hello world"
557        cert, key = _load_cert_key()
558        rsa_key = load_vectors_from_file(
559            os.path.join("x509", "custom", "ca", "rsa_key.pem"),
560            lambda pemfile: serialization.load_pem_private_key(
561                pemfile.read(), None
562            ),
563            mode="rb",
564        )
565        rsa_cert = load_vectors_from_file(
566            os.path.join("x509", "custom", "ca", "rsa_ca.pem"),
567            loader=lambda pemfile: x509.load_pem_x509_certificate(
568                pemfile.read()
569            ),
570            mode="rb",
571        )
572        builder = (
573            pkcs7.PKCS7SignatureBuilder()
574            .set_data(data)
575            .add_signer(cert, key, hashes.SHA512())
576            .add_signer(rsa_cert, rsa_key, hashes.SHA512())
577        )
578        options = []
579        sig = builder.sign(serialization.Encoding.DER, options)
580        # There should be three SHA512 OIDs in this structure
581        assert sig.count(b"\x06\t`\x86H\x01e\x03\x04\x02\x03") == 3
582        _pkcs7_verify(
583            serialization.Encoding.DER,
584            sig,
585            None,
586            [cert, rsa_cert],
587            options,
588            backend,
589        )
590
591    def test_multiple_signers_different_hash_algs(self, backend):
592        data = b"hello world"
593        cert, key = _load_cert_key()
594        rsa_key = load_vectors_from_file(
595            os.path.join("x509", "custom", "ca", "rsa_key.pem"),
596            lambda pemfile: serialization.load_pem_private_key(
597                pemfile.read(), None
598            ),
599            mode="rb",
600        )
601        rsa_cert = load_vectors_from_file(
602            os.path.join("x509", "custom", "ca", "rsa_ca.pem"),
603            loader=lambda pemfile: x509.load_pem_x509_certificate(
604                pemfile.read()
605            ),
606            mode="rb",
607        )
608        builder = (
609            pkcs7.PKCS7SignatureBuilder()
610            .set_data(data)
611            .add_signer(cert, key, hashes.SHA384())
612            .add_signer(rsa_cert, rsa_key, hashes.SHA512())
613        )
614        options = []
615        sig = builder.sign(serialization.Encoding.DER, options)
616        # There should be two SHA384 and two SHA512 OIDs in this structure
617        assert sig.count(b"\x06\t`\x86H\x01e\x03\x04\x02\x02") == 2
618        assert sig.count(b"\x06\t`\x86H\x01e\x03\x04\x02\x03") == 2
619        _pkcs7_verify(
620            serialization.Encoding.DER,
621            sig,
622            None,
623            [cert, rsa_cert],
624            options,
625            backend,
626        )
627
628    def test_add_additional_cert_not_a_cert(self, backend):
629        with pytest.raises(TypeError):
630            pkcs7.PKCS7SignatureBuilder().add_certificate(b"notacert")
631
632    def test_add_additional_cert(self, backend):
633        data = b"hello world"
634        cert, key = _load_cert_key()
635        rsa_cert = load_vectors_from_file(
636            os.path.join("x509", "custom", "ca", "rsa_ca.pem"),
637            loader=lambda pemfile: x509.load_pem_x509_certificate(
638                pemfile.read()
639            ),
640            mode="rb",
641        )
642        builder = (
643            pkcs7.PKCS7SignatureBuilder()
644            .set_data(data)
645            .add_signer(cert, key, hashes.SHA384())
646            .add_certificate(rsa_cert)
647        )
648        options = []
649        sig = builder.sign(serialization.Encoding.DER, options)
650        assert (
651            sig.count(rsa_cert.public_bytes(serialization.Encoding.DER)) == 1
652        )
653
654    def test_add_multiple_additional_certs(self, backend):
655        data = b"hello world"
656        cert, key = _load_cert_key()
657        rsa_cert = load_vectors_from_file(
658            os.path.join("x509", "custom", "ca", "rsa_ca.pem"),
659            loader=lambda pemfile: x509.load_pem_x509_certificate(
660                pemfile.read()
661            ),
662            mode="rb",
663        )
664        builder = (
665            pkcs7.PKCS7SignatureBuilder()
666            .set_data(data)
667            .add_signer(cert, key, hashes.SHA384())
668            .add_certificate(rsa_cert)
669            .add_certificate(rsa_cert)
670        )
671        options = []
672        sig = builder.sign(serialization.Encoding.DER, options)
673        assert (
674            sig.count(rsa_cert.public_bytes(serialization.Encoding.DER)) == 2
675        )
676