1# This file is dual licensed under the terms of the Apache License, Version
2# 2.0, and the BSD License. See the LICENSE file in the root of this repository
3# for complete details.
4
5from __future__ import absolute_import, division, print_function
6
7from cryptography import utils
8from cryptography.exceptions import UnsupportedAlgorithm, _Reasons
9from cryptography.hazmat.primitives import serialization
10from cryptography.hazmat.primitives.asymmetric import dh
11
12
13def _dh_params_dup(dh_cdata, backend):
14    lib = backend._lib
15    ffi = backend._ffi
16
17    param_cdata = lib.DHparams_dup(dh_cdata)
18    backend.openssl_assert(param_cdata != ffi.NULL)
19    param_cdata = ffi.gc(param_cdata, lib.DH_free)
20    if lib.CRYPTOGRAPHY_IS_LIBRESSL:
21        # In libressl DHparams_dup don't copy q
22        q = ffi.new("BIGNUM **")
23        lib.DH_get0_pqg(dh_cdata, ffi.NULL, q, ffi.NULL)
24        q_dup = lib.BN_dup(q[0])
25        res = lib.DH_set0_pqg(param_cdata, ffi.NULL, q_dup, ffi.NULL)
26        backend.openssl_assert(res == 1)
27
28    return param_cdata
29
30
31def _dh_cdata_to_parameters(dh_cdata, backend):
32    param_cdata = _dh_params_dup(dh_cdata, backend)
33    return _DHParameters(backend, param_cdata)
34
35
36@utils.register_interface(dh.DHParametersWithSerialization)
37class _DHParameters(object):
38    def __init__(self, backend, dh_cdata):
39        self._backend = backend
40        self._dh_cdata = dh_cdata
41
42    def parameter_numbers(self):
43        p = self._backend._ffi.new("BIGNUM **")
44        g = self._backend._ffi.new("BIGNUM **")
45        q = self._backend._ffi.new("BIGNUM **")
46        self._backend._lib.DH_get0_pqg(self._dh_cdata, p, q, g)
47        self._backend.openssl_assert(p[0] != self._backend._ffi.NULL)
48        self._backend.openssl_assert(g[0] != self._backend._ffi.NULL)
49        if q[0] == self._backend._ffi.NULL:
50            q_val = None
51        else:
52            q_val = self._backend._bn_to_int(q[0])
53        return dh.DHParameterNumbers(
54            p=self._backend._bn_to_int(p[0]),
55            g=self._backend._bn_to_int(g[0]),
56            q=q_val,
57        )
58
59    def generate_private_key(self):
60        return self._backend.generate_dh_private_key(self)
61
62    def parameter_bytes(self, encoding, format):
63        if format is not serialization.ParameterFormat.PKCS3:
64            raise ValueError("Only PKCS3 serialization is supported")
65        if not self._backend._lib.Cryptography_HAS_EVP_PKEY_DHX:
66            q = self._backend._ffi.new("BIGNUM **")
67            self._backend._lib.DH_get0_pqg(
68                self._dh_cdata,
69                self._backend._ffi.NULL,
70                q,
71                self._backend._ffi.NULL,
72            )
73            if q[0] != self._backend._ffi.NULL:
74                raise UnsupportedAlgorithm(
75                    "DH X9.42 serialization is not supported",
76                    _Reasons.UNSUPPORTED_SERIALIZATION,
77                )
78
79        return self._backend._parameter_bytes(encoding, format, self._dh_cdata)
80
81
82def _get_dh_num_bits(backend, dh_cdata):
83    p = backend._ffi.new("BIGNUM **")
84    backend._lib.DH_get0_pqg(dh_cdata, p, backend._ffi.NULL, backend._ffi.NULL)
85    backend.openssl_assert(p[0] != backend._ffi.NULL)
86    return backend._lib.BN_num_bits(p[0])
87
88
89@utils.register_interface(dh.DHPrivateKeyWithSerialization)
90class _DHPrivateKey(object):
91    def __init__(self, backend, dh_cdata, evp_pkey):
92        self._backend = backend
93        self._dh_cdata = dh_cdata
94        self._evp_pkey = evp_pkey
95        self._key_size_bytes = self._backend._lib.DH_size(dh_cdata)
96
97    @property
98    def key_size(self):
99        return _get_dh_num_bits(self._backend, self._dh_cdata)
100
101    def private_numbers(self):
102        p = self._backend._ffi.new("BIGNUM **")
103        g = self._backend._ffi.new("BIGNUM **")
104        q = self._backend._ffi.new("BIGNUM **")
105        self._backend._lib.DH_get0_pqg(self._dh_cdata, p, q, g)
106        self._backend.openssl_assert(p[0] != self._backend._ffi.NULL)
107        self._backend.openssl_assert(g[0] != self._backend._ffi.NULL)
108        if q[0] == self._backend._ffi.NULL:
109            q_val = None
110        else:
111            q_val = self._backend._bn_to_int(q[0])
112        pub_key = self._backend._ffi.new("BIGNUM **")
113        priv_key = self._backend._ffi.new("BIGNUM **")
114        self._backend._lib.DH_get0_key(self._dh_cdata, pub_key, priv_key)
115        self._backend.openssl_assert(pub_key[0] != self._backend._ffi.NULL)
116        self._backend.openssl_assert(priv_key[0] != self._backend._ffi.NULL)
117        return dh.DHPrivateNumbers(
118            public_numbers=dh.DHPublicNumbers(
119                parameter_numbers=dh.DHParameterNumbers(
120                    p=self._backend._bn_to_int(p[0]),
121                    g=self._backend._bn_to_int(g[0]),
122                    q=q_val,
123                ),
124                y=self._backend._bn_to_int(pub_key[0]),
125            ),
126            x=self._backend._bn_to_int(priv_key[0]),
127        )
128
129    def exchange(self, peer_public_key):
130
131        buf = self._backend._ffi.new("unsigned char[]", self._key_size_bytes)
132        pub_key = self._backend._ffi.new("BIGNUM **")
133        self._backend._lib.DH_get0_key(
134            peer_public_key._dh_cdata, pub_key, self._backend._ffi.NULL
135        )
136        self._backend.openssl_assert(pub_key[0] != self._backend._ffi.NULL)
137        res = self._backend._lib.DH_compute_key(
138            buf, pub_key[0], self._dh_cdata
139        )
140
141        if res == -1:
142            errors_with_text = self._backend._consume_errors_with_text()
143            raise ValueError(
144                "Error computing shared key. Public key is likely invalid "
145                "for this exchange.",
146                errors_with_text,
147            )
148        else:
149            self._backend.openssl_assert(res >= 1)
150
151            key = self._backend._ffi.buffer(buf)[:res]
152            pad = self._key_size_bytes - len(key)
153
154            if pad > 0:
155                key = (b"\x00" * pad) + key
156
157            return key
158
159    def public_key(self):
160        dh_cdata = _dh_params_dup(self._dh_cdata, self._backend)
161        pub_key = self._backend._ffi.new("BIGNUM **")
162        self._backend._lib.DH_get0_key(
163            self._dh_cdata, pub_key, self._backend._ffi.NULL
164        )
165        self._backend.openssl_assert(pub_key[0] != self._backend._ffi.NULL)
166        pub_key_dup = self._backend._lib.BN_dup(pub_key[0])
167        self._backend.openssl_assert(pub_key_dup != self._backend._ffi.NULL)
168
169        res = self._backend._lib.DH_set0_key(
170            dh_cdata, pub_key_dup, self._backend._ffi.NULL
171        )
172        self._backend.openssl_assert(res == 1)
173        evp_pkey = self._backend._dh_cdata_to_evp_pkey(dh_cdata)
174        return _DHPublicKey(self._backend, dh_cdata, evp_pkey)
175
176    def parameters(self):
177        return _dh_cdata_to_parameters(self._dh_cdata, self._backend)
178
179    def private_bytes(self, encoding, format, encryption_algorithm):
180        if format is not serialization.PrivateFormat.PKCS8:
181            raise ValueError(
182                "DH private keys support only PKCS8 serialization"
183            )
184        if not self._backend._lib.Cryptography_HAS_EVP_PKEY_DHX:
185            q = self._backend._ffi.new("BIGNUM **")
186            self._backend._lib.DH_get0_pqg(
187                self._dh_cdata,
188                self._backend._ffi.NULL,
189                q,
190                self._backend._ffi.NULL,
191            )
192            if q[0] != self._backend._ffi.NULL:
193                raise UnsupportedAlgorithm(
194                    "DH X9.42 serialization is not supported",
195                    _Reasons.UNSUPPORTED_SERIALIZATION,
196                )
197
198        return self._backend._private_key_bytes(
199            encoding,
200            format,
201            encryption_algorithm,
202            self,
203            self._evp_pkey,
204            self._dh_cdata,
205        )
206
207
208@utils.register_interface(dh.DHPublicKeyWithSerialization)
209class _DHPublicKey(object):
210    def __init__(self, backend, dh_cdata, evp_pkey):
211        self._backend = backend
212        self._dh_cdata = dh_cdata
213        self._evp_pkey = evp_pkey
214        self._key_size_bits = _get_dh_num_bits(self._backend, self._dh_cdata)
215
216    @property
217    def key_size(self):
218        return self._key_size_bits
219
220    def public_numbers(self):
221        p = self._backend._ffi.new("BIGNUM **")
222        g = self._backend._ffi.new("BIGNUM **")
223        q = self._backend._ffi.new("BIGNUM **")
224        self._backend._lib.DH_get0_pqg(self._dh_cdata, p, q, g)
225        self._backend.openssl_assert(p[0] != self._backend._ffi.NULL)
226        self._backend.openssl_assert(g[0] != self._backend._ffi.NULL)
227        if q[0] == self._backend._ffi.NULL:
228            q_val = None
229        else:
230            q_val = self._backend._bn_to_int(q[0])
231        pub_key = self._backend._ffi.new("BIGNUM **")
232        self._backend._lib.DH_get0_key(
233            self._dh_cdata, pub_key, self._backend._ffi.NULL
234        )
235        self._backend.openssl_assert(pub_key[0] != self._backend._ffi.NULL)
236        return dh.DHPublicNumbers(
237            parameter_numbers=dh.DHParameterNumbers(
238                p=self._backend._bn_to_int(p[0]),
239                g=self._backend._bn_to_int(g[0]),
240                q=q_val,
241            ),
242            y=self._backend._bn_to_int(pub_key[0]),
243        )
244
245    def parameters(self):
246        return _dh_cdata_to_parameters(self._dh_cdata, self._backend)
247
248    def public_bytes(self, encoding, format):
249        if format is not serialization.PublicFormat.SubjectPublicKeyInfo:
250            raise ValueError(
251                "DH public keys support only "
252                "SubjectPublicKeyInfo serialization"
253            )
254
255        if not self._backend._lib.Cryptography_HAS_EVP_PKEY_DHX:
256            q = self._backend._ffi.new("BIGNUM **")
257            self._backend._lib.DH_get0_pqg(
258                self._dh_cdata,
259                self._backend._ffi.NULL,
260                q,
261                self._backend._ffi.NULL,
262            )
263            if q[0] != self._backend._ffi.NULL:
264                raise UnsupportedAlgorithm(
265                    "DH X9.42 serialization is not supported",
266                    _Reasons.UNSUPPORTED_SERIALIZATION,
267                )
268
269        return self._backend._public_key_bytes(
270            encoding, format, self, self._evp_pkey, None
271        )
272