1# Copyright (c) 2013-2021 by Ron Frederick <ronf@timeheart.net> and others. 2# 3# This program and the accompanying materials are made available under 4# the terms of the Eclipse Public License v2.0 which accompanies this 5# distribution and is available at: 6# 7# http://www.eclipse.org/legal/epl-2.0/ 8# 9# This program may also be made available under the following secondary 10# licenses when the conditions for such availability set forth in the 11# Eclipse Public License v2.0 are satisfied: 12# 13# GNU General Public License, Version 2.0, or any later versions of 14# that license 15# 16# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later 17# 18# Contributors: 19# Ron Frederick - initial implementation, API, and documentation 20 21"""Symmetric key encryption handlers""" 22 23from .crypto import BasicCipher, GCMCipher, ChachaCipher, get_cipher_params 24from .mac import get_mac_params, get_mac 25from .packet import UInt64 26 27 28_enc_algs = [] 29_default_enc_algs = [] 30 31_enc_params = {} 32 33 34class Encryption: 35 """Parent class for SSH packet encryption objects""" 36 37 @classmethod 38 def new(cls, cipher_name, key, iv, mac_alg=b'', mac_key=b'', etm=False): 39 """Construct a new SSH packet encryption object""" 40 41 raise NotImplementedError 42 43 @classmethod 44 def get_mac_params(cls, mac_alg): 45 """Get paramaters of the MAC algorithm used with this encryption""" 46 47 return get_mac_params(mac_alg) 48 49 def encrypt_packet(self, seq, header, packet): 50 """Encrypt and sign an SSH packet""" 51 52 raise NotImplementedError 53 54 def decrypt_header(self, seq, first_block, header_len): 55 """Decrypt an SSH packet header""" 56 57 raise NotImplementedError 58 59 def decrypt_packet(self, seq, first, rest, header_len, mac): 60 """Verify the signature of and decrypt an SSH packet""" 61 62 raise NotImplementedError 63 64 65class BasicEncryption(Encryption): 66 """Shim for basic encryption""" 67 68 def __init__(self, cipher, mac): 69 self._cipher = cipher 70 self._mac = mac 71 72 @classmethod 73 def new(cls, cipher_name, key, iv, mac_alg=b'', mac_key=b'', etm=False): 74 """Construct a new SSH packet encryption object for basic ciphers""" 75 76 cipher = BasicCipher(cipher_name, key, iv) 77 mac = get_mac(mac_alg, mac_key) 78 79 if etm: 80 return ETMEncryption(cipher, mac) 81 else: 82 return cls(cipher, mac) 83 84 def encrypt_packet(self, seq, header, packet): 85 """Encrypt and sign an SSH packet""" 86 87 packet = header + packet 88 mac = self._mac.sign(seq, packet) if self._mac else b'' 89 90 return self._cipher.encrypt(packet), mac 91 92 def decrypt_header(self, seq, first_block, header_len): 93 """Decrypt an SSH packet header""" 94 95 first_block = self._cipher.decrypt(first_block) 96 97 return first_block, first_block[:header_len] 98 99 def decrypt_packet(self, seq, first, rest, header_len, mac): 100 """Verify the signature of and decrypt an SSH packet""" 101 102 packet = first + self._cipher.decrypt(rest) 103 104 if self._mac.verify(seq, packet, mac): 105 return packet[header_len:] 106 else: 107 return None 108 109 110class ETMEncryption(BasicEncryption): 111 """Shim for encrypt-then-mac encryption""" 112 113 def encrypt_packet(self, seq, header, packet): 114 """Encrypt and sign an SSH packet""" 115 116 packet = header + self._cipher.encrypt(packet) 117 return packet, self._mac.sign(seq, packet) 118 119 def decrypt_header(self, seq, first_block, header_len): 120 """Decrypt an SSH packet header""" 121 122 return first_block, first_block[:header_len] 123 124 def decrypt_packet(self, seq, first, rest, header_len, mac): 125 """Verify the signature of and decrypt an SSH packet""" 126 127 packet = first + rest 128 129 if self._mac.verify(seq, packet, mac): 130 return self._cipher.decrypt(packet[header_len:]) 131 else: 132 return None 133 134 135class GCMEncryption(Encryption): 136 """Shim for GCM encryption""" 137 138 def __init__(self, cipher): 139 self._cipher = cipher 140 141 @classmethod 142 def new(cls, cipher_name, key, iv, mac_alg=b'', mac_key=b'', etm=False): 143 """Construct a new SSH packet encryption object for GCM ciphers""" 144 145 return cls(GCMCipher(cipher_name, key, iv)) 146 147 @classmethod 148 def get_mac_params(cls, mac_alg): 149 """Get paramaters of the MAC algorithm used with this encryption""" 150 151 return 0, 16, True 152 153 def encrypt_packet(self, seq, header, packet): 154 """Encrypt and sign an SSH packet""" 155 156 return self._cipher.encrypt_and_sign(header, packet) 157 158 def decrypt_header(self, seq, first_block, header_len): 159 """Decrypt an SSH packet header""" 160 161 return first_block, first_block[:header_len] 162 163 def decrypt_packet(self, seq, first, rest, header_len, mac): 164 """Verify the signature of and decrypt an SSH packet""" 165 166 return self._cipher.verify_and_decrypt(first[:header_len], 167 first[header_len:] + rest, mac) 168 169 170class ChachaEncryption(Encryption): 171 """Shim for chacha20-poly1305 encryption""" 172 173 def __init__(self, cipher): 174 self._cipher = cipher 175 176 @classmethod 177 def new(cls, cipher_name, key, iv, mac_alg=b'', mac_key=b'', etm=False): 178 """Construct a new SSH packet encryption object for Chacha ciphers""" 179 180 return cls(ChachaCipher(key)) 181 182 @classmethod 183 def get_mac_params(cls, mac_alg): 184 """Get paramaters of the MAC algorithm used with this encryption""" 185 186 return 0, 16, True 187 188 def encrypt_packet(self, seq, header, packet): 189 """Encrypt and sign an SSH packet""" 190 191 return self._cipher.encrypt_and_sign(header, packet, UInt64(seq)) 192 193 def decrypt_header(self, seq, first_block, header_len): 194 """Decrypt an SSH packet header""" 195 196 return (first_block, 197 self._cipher.decrypt_header(first_block[:header_len], 198 UInt64(seq))) 199 200 def decrypt_packet(self, seq, first, rest, header_len, mac): 201 """Verify the signature of and decrypt an SSH packet""" 202 203 return self._cipher.verify_and_decrypt(first[:header_len], 204 first[header_len:] + rest, 205 UInt64(seq), mac) 206 207 208def register_encryption_alg(enc_alg, encryption, cipher_name, default): 209 """Register an encryption algorithm""" 210 211 try: 212 get_cipher_params(cipher_name) 213 except KeyError: 214 pass 215 else: 216 _enc_algs.append(enc_alg) 217 218 if default: 219 _default_enc_algs.append(enc_alg) 220 221 _enc_params[enc_alg] = (encryption, cipher_name) 222 223 224def get_encryption_algs(): 225 """Return supported encryption algorithms""" 226 227 return _enc_algs 228 229 230def get_default_encryption_algs(): 231 """Return default encryption algorithms""" 232 233 return _default_enc_algs 234 235 236def get_encryption_params(enc_alg, mac_alg=b''): 237 """Get parameters of an encryption and MAC algorithm""" 238 239 encryption, cipher_name = _enc_params[enc_alg] 240 enc_keysize, enc_ivsize, enc_blocksize = get_cipher_params(cipher_name) 241 mac_keysize, mac_hashsize, etm = encryption.get_mac_params(mac_alg) 242 243 return (enc_keysize, enc_ivsize, enc_blocksize, 244 mac_keysize, mac_hashsize, etm) 245 246 247def get_encryption(enc_alg, key, iv, mac_alg=b'', mac_key=b'', etm=False): 248 """Return an object which can encrypt and decrypt SSH packets""" 249 250 encryption, cipher_name = _enc_params[enc_alg] 251 252 return encryption.new(cipher_name, key, iv, mac_alg, mac_key, etm) 253 254 255_enc_alg_list = ( 256 (b'chacha20-poly1305@openssh.com', ChachaEncryption, 257 'chacha20-poly1305', True), 258 (b'aes256-gcm@openssh.com', GCMEncryption, 259 'aes256-gcm', True), 260 (b'aes128-gcm@openssh.com', GCMEncryption, 261 'aes128-gcm', True), 262 (b'aes256-ctr', BasicEncryption, 263 'aes256-ctr', True), 264 (b'aes192-ctr', BasicEncryption, 265 'aes192-ctr', True), 266 (b'aes128-ctr', BasicEncryption, 267 'aes128-ctr', True), 268 (b'aes256-cbc', BasicEncryption, 269 'aes256-cbc', False), 270 (b'aes192-cbc', BasicEncryption, 271 'aes192-cbc', False), 272 (b'aes128-cbc', BasicEncryption, 273 'aes128-cbc', False), 274 (b'3des-cbc', BasicEncryption, 275 'des3-cbc', False), 276 (b'blowfish-cbc', BasicEncryption, 277 'blowfish-cbc', False), 278 (b'cast128-cbc', BasicEncryption, 279 'cast128-cbc', False), 280 (b'seed-cbc@ssh.com', BasicEncryption, 281 'seed-cbc', False), 282 (b'arcfour256', BasicEncryption, 283 'arcfour256', False), 284 (b'arcfour128', BasicEncryption, 285 'arcfour128', False), 286 (b'arcfour', BasicEncryption, 287 'arcfour', False) 288) 289 290for _enc_alg_args in _enc_alg_list: 291 register_encryption_alg(*_enc_alg_args) 292