1# Copyright (c) 2013-2021 by Ron Frederick <ronf@timeheart.net> and others.
2#
3# This program and the accompanying materials are made available under
4# the terms of the Eclipse Public License v2.0 which accompanies this
5# distribution and is available at:
6#
7#     http://www.eclipse.org/legal/epl-2.0/
8#
9# This program may also be made available under the following secondary
10# licenses when the conditions for such availability set forth in the
11# Eclipse Public License v2.0 are satisfied:
12#
13#    GNU General Public License, Version 2.0, or any later versions of
14#    that license
15#
16# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
17#
18# Contributors:
19#     Ron Frederick - initial implementation, API, and documentation
20
21"""Symmetric key encryption handlers"""
22
23from .crypto import BasicCipher, GCMCipher, ChachaCipher, get_cipher_params
24from .mac import get_mac_params, get_mac
25from .packet import UInt64
26
27
28_enc_algs = []
29_default_enc_algs = []
30
31_enc_params = {}
32
33
34class Encryption:
35    """Parent class for SSH packet encryption objects"""
36
37    @classmethod
38    def new(cls, cipher_name, key, iv, mac_alg=b'', mac_key=b'', etm=False):
39        """Construct a new SSH packet encryption object"""
40
41        raise NotImplementedError
42
43    @classmethod
44    def get_mac_params(cls, mac_alg):
45        """Get paramaters of the MAC algorithm used with this encryption"""
46
47        return get_mac_params(mac_alg)
48
49    def encrypt_packet(self, seq, header, packet):
50        """Encrypt and sign an SSH packet"""
51
52        raise NotImplementedError
53
54    def decrypt_header(self, seq, first_block, header_len):
55        """Decrypt an SSH packet header"""
56
57        raise NotImplementedError
58
59    def decrypt_packet(self, seq, first, rest, header_len, mac):
60        """Verify the signature of and decrypt an SSH packet"""
61
62        raise NotImplementedError
63
64
65class BasicEncryption(Encryption):
66    """Shim for basic encryption"""
67
68    def __init__(self, cipher, mac):
69        self._cipher = cipher
70        self._mac = mac
71
72    @classmethod
73    def new(cls, cipher_name, key, iv, mac_alg=b'', mac_key=b'', etm=False):
74        """Construct a new SSH packet encryption object for basic ciphers"""
75
76        cipher = BasicCipher(cipher_name, key, iv)
77        mac = get_mac(mac_alg, mac_key)
78
79        if etm:
80            return ETMEncryption(cipher, mac)
81        else:
82            return cls(cipher, mac)
83
84    def encrypt_packet(self, seq, header, packet):
85        """Encrypt and sign an SSH packet"""
86
87        packet = header + packet
88        mac = self._mac.sign(seq, packet) if self._mac else b''
89
90        return self._cipher.encrypt(packet), mac
91
92    def decrypt_header(self, seq, first_block, header_len):
93        """Decrypt an SSH packet header"""
94
95        first_block = self._cipher.decrypt(first_block)
96
97        return first_block, first_block[:header_len]
98
99    def decrypt_packet(self, seq, first, rest, header_len, mac):
100        """Verify the signature of and decrypt an SSH packet"""
101
102        packet = first + self._cipher.decrypt(rest)
103
104        if self._mac.verify(seq, packet, mac):
105            return packet[header_len:]
106        else:
107            return None
108
109
110class ETMEncryption(BasicEncryption):
111    """Shim for encrypt-then-mac encryption"""
112
113    def encrypt_packet(self, seq, header, packet):
114        """Encrypt and sign an SSH packet"""
115
116        packet = header + self._cipher.encrypt(packet)
117        return packet, self._mac.sign(seq, packet)
118
119    def decrypt_header(self, seq, first_block, header_len):
120        """Decrypt an SSH packet header"""
121
122        return first_block, first_block[:header_len]
123
124    def decrypt_packet(self, seq, first, rest, header_len, mac):
125        """Verify the signature of and decrypt an SSH packet"""
126
127        packet = first + rest
128
129        if self._mac.verify(seq, packet, mac):
130            return self._cipher.decrypt(packet[header_len:])
131        else:
132            return None
133
134
135class GCMEncryption(Encryption):
136    """Shim for GCM encryption"""
137
138    def __init__(self, cipher):
139        self._cipher = cipher
140
141    @classmethod
142    def new(cls, cipher_name, key, iv, mac_alg=b'', mac_key=b'', etm=False):
143        """Construct a new SSH packet encryption object for GCM ciphers"""
144
145        return cls(GCMCipher(cipher_name, key, iv))
146
147    @classmethod
148    def get_mac_params(cls, mac_alg):
149        """Get paramaters of the MAC algorithm used with this encryption"""
150
151        return 0, 16, True
152
153    def encrypt_packet(self, seq, header, packet):
154        """Encrypt and sign an SSH packet"""
155
156        return self._cipher.encrypt_and_sign(header, packet)
157
158    def decrypt_header(self, seq, first_block, header_len):
159        """Decrypt an SSH packet header"""
160
161        return first_block, first_block[:header_len]
162
163    def decrypt_packet(self, seq, first, rest, header_len, mac):
164        """Verify the signature of and decrypt an SSH packet"""
165
166        return self._cipher.verify_and_decrypt(first[:header_len],
167                                               first[header_len:] + rest, mac)
168
169
170class ChachaEncryption(Encryption):
171    """Shim for chacha20-poly1305 encryption"""
172
173    def __init__(self, cipher):
174        self._cipher = cipher
175
176    @classmethod
177    def new(cls, cipher_name, key, iv, mac_alg=b'', mac_key=b'', etm=False):
178        """Construct a new SSH packet encryption object for Chacha ciphers"""
179
180        return cls(ChachaCipher(key))
181
182    @classmethod
183    def get_mac_params(cls, mac_alg):
184        """Get paramaters of the MAC algorithm used with this encryption"""
185
186        return 0, 16, True
187
188    def encrypt_packet(self, seq, header, packet):
189        """Encrypt and sign an SSH packet"""
190
191        return self._cipher.encrypt_and_sign(header, packet, UInt64(seq))
192
193    def decrypt_header(self, seq, first_block, header_len):
194        """Decrypt an SSH packet header"""
195
196        return (first_block,
197                self._cipher.decrypt_header(first_block[:header_len],
198                                            UInt64(seq)))
199
200    def decrypt_packet(self, seq, first, rest, header_len, mac):
201        """Verify the signature of and decrypt an SSH packet"""
202
203        return self._cipher.verify_and_decrypt(first[:header_len],
204                                               first[header_len:] + rest,
205                                               UInt64(seq), mac)
206
207
208def register_encryption_alg(enc_alg, encryption, cipher_name, default):
209    """Register an encryption algorithm"""
210
211    try:
212        get_cipher_params(cipher_name)
213    except KeyError:
214        pass
215    else:
216        _enc_algs.append(enc_alg)
217
218        if default:
219            _default_enc_algs.append(enc_alg)
220
221        _enc_params[enc_alg] = (encryption, cipher_name)
222
223
224def get_encryption_algs():
225    """Return supported encryption algorithms"""
226
227    return _enc_algs
228
229
230def get_default_encryption_algs():
231    """Return default encryption algorithms"""
232
233    return _default_enc_algs
234
235
236def get_encryption_params(enc_alg, mac_alg=b''):
237    """Get parameters of an encryption and MAC algorithm"""
238
239    encryption, cipher_name = _enc_params[enc_alg]
240    enc_keysize, enc_ivsize, enc_blocksize = get_cipher_params(cipher_name)
241    mac_keysize, mac_hashsize, etm = encryption.get_mac_params(mac_alg)
242
243    return (enc_keysize, enc_ivsize, enc_blocksize,
244            mac_keysize, mac_hashsize, etm)
245
246
247def get_encryption(enc_alg, key, iv, mac_alg=b'', mac_key=b'', etm=False):
248    """Return an object which can encrypt and decrypt SSH packets"""
249
250    encryption, cipher_name = _enc_params[enc_alg]
251
252    return encryption.new(cipher_name, key, iv, mac_alg, mac_key, etm)
253
254
255_enc_alg_list = (
256    (b'chacha20-poly1305@openssh.com', ChachaEncryption,
257     'chacha20-poly1305', True),
258    (b'aes256-gcm@openssh.com',        GCMEncryption,
259     'aes256-gcm',        True),
260    (b'aes128-gcm@openssh.com',        GCMEncryption,
261     'aes128-gcm',        True),
262    (b'aes256-ctr',                    BasicEncryption,
263     'aes256-ctr',        True),
264    (b'aes192-ctr',                    BasicEncryption,
265     'aes192-ctr',        True),
266    (b'aes128-ctr',                    BasicEncryption,
267     'aes128-ctr',        True),
268    (b'aes256-cbc',                    BasicEncryption,
269     'aes256-cbc',        False),
270    (b'aes192-cbc',                    BasicEncryption,
271     'aes192-cbc',        False),
272    (b'aes128-cbc',                    BasicEncryption,
273     'aes128-cbc',        False),
274    (b'3des-cbc',                      BasicEncryption,
275     'des3-cbc',          False),
276    (b'blowfish-cbc',                  BasicEncryption,
277     'blowfish-cbc',      False),
278    (b'cast128-cbc',                   BasicEncryption,
279     'cast128-cbc',       False),
280    (b'seed-cbc@ssh.com',              BasicEncryption,
281     'seed-cbc',          False),
282    (b'arcfour256',                    BasicEncryption,
283     'arcfour256',        False),
284    (b'arcfour128',                    BasicEncryption,
285     'arcfour128',        False),
286    (b'arcfour',                       BasicEncryption,
287     'arcfour',           False)
288)
289
290for _enc_alg_args in _enc_alg_list:
291    register_encryption_alg(*_enc_alg_args)
292