1# This file is part of Scapy 2# Copyright (C) 2017 Maxence Tury 3# This program is published under a GPLv2 license 4 5""" 6SSLv2 handshake fields & logic. 7""" 8 9import struct 10 11from scapy.error import log_runtime, warning 12from scapy.utils import randstring 13from scapy.fields import ByteEnumField, ByteField, EnumField, FieldLenField, \ 14 ShortEnumField, StrLenField, XStrField, XStrLenField 15 16from scapy.packet import Padding 17from scapy.layers.tls.cert import Cert 18from scapy.layers.tls.basefields import _tls_version, _TLSVersionField 19from scapy.layers.tls.handshake import _CipherSuitesField 20from scapy.layers.tls.keyexchange import _TLSSignatureField, _TLSSignature 21from scapy.layers.tls.session import (_GenericTLSSessionInheritance, 22 readConnState, writeConnState) 23from scapy.layers.tls.crypto.suites import (_tls_cipher_suites, 24 _tls_cipher_suites_cls, 25 get_usable_ciphersuites, 26 SSL_CK_DES_192_EDE3_CBC_WITH_MD5) 27 28 29############################################################################### 30# Generic SSLv2 Handshake message # 31############################################################################### 32 33_sslv2_handshake_type = {0: "error", 1: "client_hello", 34 2: "client_master_key", 3: "client_finished", 35 4: "server_hello", 5: "server_verify", 36 6: "server_finished", 7: "request_certificate", 37 8: "client_certificate"} 38 39 40class _SSLv2Handshake(_GenericTLSSessionInheritance): 41 """ 42 Inherited by other Handshake classes to get post_build(). 43 Also used as a fallback for unknown TLS Handshake packets. 44 """ 45 name = "SSLv2 Handshake Generic message" 46 fields_desc = [ByteEnumField("msgtype", None, _sslv2_handshake_type)] 47 48 def guess_payload_class(self, p): 49 return Padding 50 51 def tls_session_update(self, msg_str): 52 """ 53 Covers both post_build- and post_dissection- context updates. 54 """ 55 self.tls_session.handshake_messages.append(msg_str) 56 self.tls_session.handshake_messages_parsed.append(self) 57 58 59############################################################################### 60# Error # 61############################################################################### 62 63_tls_error_code = {1: "no_cipher", 2: "no_certificate", 64 4: "bad_certificate", 6: "unsupported_certificate_type"} 65 66 67class SSLv2Error(_SSLv2Handshake): 68 """ 69 SSLv2 Error. 70 """ 71 name = "SSLv2 Handshake - Error" 72 fields_desc = [ByteEnumField("msgtype", 0, _sslv2_handshake_type), 73 ShortEnumField("code", None, _tls_error_code)] 74 75 76############################################################################### 77# ClientHello # 78############################################################################### 79 80class _SSLv2CipherSuitesField(_CipherSuitesField): 81 def __init__(self, name, default, dico, length_from=None): 82 _CipherSuitesField.__init__(self, name, default, dico, 83 length_from=length_from) 84 self.itemfmt = b"" 85 self.itemsize = 3 86 87 def i2m(self, pkt, val): 88 if val is None: 89 val2 = [] 90 val2 = [(x >> 16, x & 0x00ffff) for x in val] 91 return b"".join([struct.pack(">BH", x[0], x[1]) for x in val2]) 92 93 def m2i(self, pkt, m): 94 res = [] 95 while m: 96 res.append(struct.unpack("!I", b"\x00" + m[:3])[0]) 97 m = m[3:] 98 return res 99 100 101class SSLv2ClientHello(_SSLv2Handshake): 102 """ 103 SSLv2 ClientHello. 104 """ 105 name = "SSLv2 Handshake - Client Hello" 106 fields_desc = [ByteEnumField("msgtype", 1, _sslv2_handshake_type), 107 _TLSVersionField("version", 0x0002, _tls_version), 108 109 FieldLenField("cipherslen", None, fmt="!H", 110 length_of="ciphers"), 111 FieldLenField("sidlen", None, fmt="!H", 112 length_of="sid"), 113 FieldLenField("challengelen", None, fmt="!H", 114 length_of="challenge"), 115 116 XStrLenField("sid", b"", 117 length_from=lambda pkt:pkt.sidlen), 118 _SSLv2CipherSuitesField("ciphers", 119 [SSL_CK_DES_192_EDE3_CBC_WITH_MD5], 120 _tls_cipher_suites, 121 length_from=lambda pkt: pkt.cipherslen), # noqa: E501 122 XStrLenField("challenge", b"", 123 length_from=lambda pkt:pkt.challengelen)] 124 125 def tls_session_update(self, msg_str): 126 super(SSLv2ClientHello, self).tls_session_update(msg_str) 127 self.tls_session.advertised_tls_version = self.version 128 self.tls_session.sslv2_common_cs = self.ciphers 129 self.tls_session.sslv2_challenge = self.challenge 130 131 132############################################################################### 133# ServerHello # 134############################################################################### 135 136class _SSLv2CertDataField(StrLenField): 137 def getfield(self, pkt, s): 138 tmp_len = 0 139 if self.length_from is not None: 140 tmp_len = self.length_from(pkt) 141 try: 142 certdata = Cert(s[:tmp_len]) 143 except Exception: 144 # Packets are sometimes wrongly interpreted as SSLv2 145 # (see record.py). We ignore failures silently 146 certdata = s[:tmp_len] 147 return s[tmp_len:], certdata 148 149 def i2len(self, pkt, i): 150 if isinstance(i, Cert): 151 return len(i.der) 152 return len(i) 153 154 def i2m(self, pkt, i): 155 if isinstance(i, Cert): 156 return i.der 157 return i 158 159 160class SSLv2ServerHello(_SSLv2Handshake): 161 """ 162 SSLv2 ServerHello. 163 """ 164 name = "SSLv2 Handshake - Server Hello" 165 fields_desc = [ByteEnumField("msgtype", 4, _sslv2_handshake_type), 166 167 ByteField("sid_hit", 0), 168 ByteEnumField("certtype", 1, {1: "x509_cert"}), 169 _TLSVersionField("version", 0x0002, _tls_version), 170 171 FieldLenField("certlen", None, fmt="!H", 172 length_of="cert"), 173 FieldLenField("cipherslen", None, fmt="!H", 174 length_of="ciphers"), 175 FieldLenField("connection_idlen", None, fmt="!H", 176 length_of="connection_id"), 177 178 _SSLv2CertDataField("cert", b"", 179 length_from=lambda pkt: pkt.certlen), 180 _SSLv2CipherSuitesField("ciphers", [], _tls_cipher_suites, 181 length_from=lambda pkt: pkt.cipherslen), # noqa: E501 182 XStrLenField("connection_id", b"", 183 length_from=lambda pkt: pkt.connection_idlen)] 184 185 def tls_session_update(self, msg_str): 186 """ 187 XXX Something should be done about the session ID here. 188 """ 189 super(SSLv2ServerHello, self).tls_session_update(msg_str) 190 191 s = self.tls_session 192 client_cs = s.sslv2_common_cs 193 css = [cs for cs in client_cs if cs in self.ciphers] 194 s.sslv2_common_cs = css 195 s.sslv2_connection_id = self.connection_id 196 s.tls_version = self.version 197 if self.cert is not None: 198 s.server_certs = [self.cert] 199 200 201############################################################################### 202# ClientMasterKey # 203############################################################################### 204 205class _SSLv2CipherSuiteField(EnumField): 206 def __init__(self, name, default, dico): 207 EnumField.__init__(self, name, default, dico) 208 209 def i2m(self, pkt, val): 210 if val is None: 211 return b"" 212 val2 = (val >> 16, val & 0x00ffff) 213 return struct.pack(">BH", val2[0], val2[1]) 214 215 def addfield(self, pkt, s, val): 216 return s + self.i2m(pkt, val) 217 218 def m2i(self, pkt, m): 219 return struct.unpack("!I", b"\x00" + m[:3])[0] 220 221 def getfield(self, pkt, s): 222 return s[3:], self.m2i(pkt, s) 223 224 225class _SSLv2EncryptedKeyField(XStrLenField): 226 def i2repr(self, pkt, x): 227 s = super(_SSLv2EncryptedKeyField, self).i2repr(pkt, x) 228 if pkt.decryptedkey is not None: 229 dx = pkt.decryptedkey 230 ds = super(_SSLv2EncryptedKeyField, self).i2repr(pkt, dx) 231 s += " [decryptedkey= %s]" % ds 232 return s 233 234 235class SSLv2ClientMasterKey(_SSLv2Handshake): 236 """ 237 SSLv2 ClientMasterKey. 238 """ 239 __slots__ = ["decryptedkey"] 240 name = "SSLv2 Handshake - Client Master Key" 241 fields_desc = [ByteEnumField("msgtype", 2, _sslv2_handshake_type), 242 _SSLv2CipherSuiteField("cipher", None, _tls_cipher_suites), 243 244 FieldLenField("clearkeylen", None, fmt="!H", 245 length_of="clearkey"), 246 FieldLenField("encryptedkeylen", None, fmt="!H", 247 length_of="encryptedkey"), 248 FieldLenField("keyarglen", None, fmt="!H", 249 length_of="keyarg"), 250 251 XStrLenField("clearkey", "", 252 length_from=lambda pkt: pkt.clearkeylen), 253 _SSLv2EncryptedKeyField("encryptedkey", "", 254 length_from=lambda pkt: pkt.encryptedkeylen), # noqa: E501 255 XStrLenField("keyarg", "", 256 length_from=lambda pkt: pkt.keyarglen)] 257 258 def __init__(self, *args, **kargs): 259 """ 260 When post_building, the packets fields are updated (this is somewhat 261 non-standard). We might need these fields later, but calling __str__ 262 on a new packet (i.e. not dissected from a raw string) applies 263 post_build to an object different from the original one... unless 264 we hackishly always set self.explicit to 1. 265 """ 266 self.decryptedkey = kargs.pop("decryptedkey", b"") 267 super(SSLv2ClientMasterKey, self).__init__(*args, **kargs) 268 self.explicit = 1 269 270 def pre_dissect(self, s): 271 clearkeylen = struct.unpack("!H", s[4:6])[0] 272 encryptedkeylen = struct.unpack("!H", s[6:8])[0] 273 encryptedkeystart = 10 + clearkeylen 274 encryptedkey = s[encryptedkeystart:encryptedkeystart + encryptedkeylen] 275 if self.tls_session.server_rsa_key: 276 self.decryptedkey = \ 277 self.tls_session.server_rsa_key.decrypt(encryptedkey) 278 else: 279 self.decryptedkey = None 280 return s 281 282 def post_build(self, pkt, pay): 283 cs_val = None 284 if self.cipher is None: 285 common_cs = self.tls_session.sslv2_common_cs 286 cs_vals = get_usable_ciphersuites(common_cs, "SSLv2") 287 if len(cs_vals) == 0: 288 warning("No known common cipher suite between SSLv2 Hellos.") 289 cs_val = 0x0700c0 290 cipher = b"\x07\x00\xc0" 291 else: 292 cs_val = cs_vals[0] # XXX choose the best one 293 cipher = struct.pack(">BH", cs_val >> 16, cs_val & 0x00ffff) 294 cs_cls = _tls_cipher_suites_cls[cs_val] 295 self.cipher = cs_val 296 else: 297 cipher = pkt[1:4] 298 cs_val = struct.unpack("!I", b"\x00" + cipher)[0] 299 if cs_val not in _tls_cipher_suites_cls: 300 warning("Unknown cipher suite %d from ClientMasterKey", cs_val) 301 cs_cls = None 302 else: 303 cs_cls = _tls_cipher_suites_cls[cs_val] 304 305 if cs_cls: 306 if (self.encryptedkey == b"" and 307 len(self.tls_session.server_certs) > 0): 308 # else, the user is responsible for export slicing & encryption 309 key = randstring(cs_cls.cipher_alg.key_len) 310 311 if self.clearkey == b"" and cs_cls.kx_alg.export: 312 self.clearkey = key[:-5] 313 314 if self.decryptedkey == b"": 315 if cs_cls.kx_alg.export: 316 self.decryptedkey = key[-5:] 317 else: 318 self.decryptedkey = key 319 320 pubkey = self.tls_session.server_certs[0].pubKey 321 self.encryptedkey = pubkey.encrypt(self.decryptedkey) 322 323 if self.keyarg == b"" and cs_cls.cipher_alg.type == "block": 324 self.keyarg = randstring(cs_cls.cipher_alg.block_size) 325 326 clearkey = self.clearkey or b"" 327 if self.clearkeylen is None: 328 self.clearkeylen = len(clearkey) 329 clearkeylen = struct.pack("!H", self.clearkeylen) 330 331 encryptedkey = self.encryptedkey or b"" 332 if self.encryptedkeylen is None: 333 self.encryptedkeylen = len(encryptedkey) 334 encryptedkeylen = struct.pack("!H", self.encryptedkeylen) 335 336 keyarg = self.keyarg or b"" 337 if self.keyarglen is None: 338 self.keyarglen = len(keyarg) 339 keyarglen = struct.pack("!H", self.keyarglen) 340 341 s = (pkt[:1] + cipher + 342 clearkeylen + encryptedkeylen + keyarglen + 343 clearkey + encryptedkey + keyarg) 344 return s + pay 345 346 def tls_session_update(self, msg_str): 347 super(SSLv2ClientMasterKey, self).tls_session_update(msg_str) 348 349 s = self.tls_session 350 cs_val = self.cipher 351 if cs_val not in _tls_cipher_suites_cls: 352 warning("Unknown cipher suite %d from ClientMasterKey", cs_val) 353 cs_cls = None 354 else: 355 cs_cls = _tls_cipher_suites_cls[cs_val] 356 357 tls_version = s.tls_version or 0x0002 358 connection_end = s.connection_end 359 wcs_seq_num = s.wcs.seq_num 360 s.pwcs = writeConnState(ciphersuite=cs_cls, 361 connection_end=connection_end, 362 seq_num=wcs_seq_num, 363 tls_version=tls_version) 364 rcs_seq_num = s.rcs.seq_num 365 s.prcs = readConnState(ciphersuite=cs_cls, 366 connection_end=connection_end, 367 seq_num=rcs_seq_num, 368 tls_version=tls_version) 369 370 if self.decryptedkey is not None: 371 s.master_secret = self.clearkey + self.decryptedkey 372 s.compute_sslv2_km_and_derive_keys() 373 374 if s.pwcs.cipher.type == "block": 375 s.pwcs.cipher.iv = self.keyarg 376 if s.prcs.cipher.type == "block": 377 s.prcs.cipher.iv = self.keyarg 378 379 s.triggered_prcs_commit = True 380 s.triggered_pwcs_commit = True 381 382 383############################################################################### 384# ServerVerify # 385############################################################################### 386 387class SSLv2ServerVerify(_SSLv2Handshake): 388 """ 389 In order to parse a ServerVerify, the exact message string should be 390 fed to the class. This is how SSLv2 defines the challenge length... 391 """ 392 name = "SSLv2 Handshake - Server Verify" 393 fields_desc = [ByteEnumField("msgtype", 5, _sslv2_handshake_type), 394 XStrField("challenge", "")] 395 396 def build(self, *args, **kargs): 397 fval = self.getfieldval("challenge") 398 if fval is None: 399 self.challenge = self.tls_session.sslv2_challenge 400 return super(SSLv2ServerVerify, self).build(*args, **kargs) 401 402 def post_dissection(self, pkt): 403 s = self.tls_session 404 if s.sslv2_challenge is not None: 405 if self.challenge != s.sslv2_challenge: 406 pkt_info = pkt.firstlayer().summary() 407 log_runtime.info("TLS: invalid ServerVerify received [%s]", pkt_info) # noqa: E501 408 409 410############################################################################### 411# RequestCertificate # 412############################################################################### 413 414class SSLv2RequestCertificate(_SSLv2Handshake): 415 """ 416 In order to parse a RequestCertificate, the exact message string should be 417 fed to the class. This is how SSLv2 defines the challenge length... 418 """ 419 name = "SSLv2 Handshake - Request Certificate" 420 fields_desc = [ByteEnumField("msgtype", 7, _sslv2_handshake_type), 421 ByteEnumField("authtype", 1, {1: "md5_with_rsa"}), 422 XStrField("challenge", "")] 423 424 def tls_session_update(self, msg_str): 425 super(SSLv2RequestCertificate, self).tls_session_update(msg_str) 426 self.tls_session.sslv2_challenge_clientcert = self.challenge 427 428 429############################################################################### 430# ClientCertificate # 431############################################################################### 432 433class SSLv2ClientCertificate(_SSLv2Handshake): 434 """ 435 SSLv2 ClientCertificate. 436 """ 437 name = "SSLv2 Handshake - Client Certificate" 438 fields_desc = [ByteEnumField("msgtype", 8, _sslv2_handshake_type), 439 440 ByteEnumField("certtype", 1, {1: "x509_cert"}), 441 FieldLenField("certlen", None, fmt="!H", 442 length_of="certdata"), 443 FieldLenField("responselen", None, fmt="!H", 444 length_of="responsedata"), 445 446 _SSLv2CertDataField("certdata", b"", 447 length_from=lambda pkt: pkt.certlen), 448 _TLSSignatureField("responsedata", None, 449 length_from=lambda pkt: pkt.responselen)] 450 451 def build(self, *args, **kargs): 452 s = self.tls_session 453 sig = self.getfieldval("responsedata") 454 test = (sig is None and 455 s.sslv2_key_material is not None and 456 s.sslv2_challenge_clientcert is not None and 457 len(s.server_certs) > 0) 458 if test: 459 s = self.tls_session 460 m = (s.sslv2_key_material + 461 s.sslv2_challenge_clientcert + 462 s.server_certs[0].der) 463 self.responsedata = _TLSSignature(tls_session=s) 464 self.responsedata._update_sig(m, s.client_key) 465 else: 466 self.responsedata = b"" 467 return super(SSLv2ClientCertificate, self).build(*args, **kargs) 468 469 def post_dissection_tls_session_update(self, msg_str): 470 self.tls_session_update(msg_str) 471 472 s = self.tls_session 473 test = (len(s.client_certs) > 0 and 474 s.sslv2_key_material is not None and 475 s.sslv2_challenge_clientcert is not None and 476 len(s.server_certs) > 0) 477 if test: 478 m = (s.sslv2_key_material + 479 s.sslv2_challenge_clientcert + 480 s.server_certs[0].der) 481 sig_test = self.responsedata._verify_sig(m, s.client_certs[0]) 482 if not sig_test: 483 pkt_info = self.firstlayer().summary() 484 log_runtime.info("TLS: invalid client CertificateVerify signature [%s]", pkt_info) # noqa: E501 485 486 def tls_session_update(self, msg_str): 487 super(SSLv2ClientCertificate, self).tls_session_update(msg_str) 488 if self.certdata: 489 self.tls_session.client_certs = [self.certdata] 490 491 492############################################################################### 493# Finished # 494############################################################################### 495 496class SSLv2ClientFinished(_SSLv2Handshake): 497 """ 498 In order to parse a ClientFinished, the exact message string should be fed 499 to the class. SSLv2 does not offer any other way to know the c_id length. 500 """ 501 name = "SSLv2 Handshake - Client Finished" 502 fields_desc = [ByteEnumField("msgtype", 3, _sslv2_handshake_type), 503 XStrField("connection_id", "")] 504 505 def build(self, *args, **kargs): 506 fval = self.getfieldval("connection_id") 507 if fval == b"": 508 self.connection_id = self.tls_session.sslv2_connection_id 509 return super(SSLv2ClientFinished, self).build(*args, **kargs) 510 511 def post_dissection(self, pkt): 512 s = self.tls_session 513 if s.sslv2_connection_id is not None: 514 if self.connection_id != s.sslv2_connection_id: 515 pkt_info = pkt.firstlayer().summary() 516 log_runtime.info("TLS: invalid client Finished received [%s]", pkt_info) # noqa: E501 517 518 519class SSLv2ServerFinished(_SSLv2Handshake): 520 """ 521 In order to parse a ServerFinished, the exact message string should be fed 522 to the class. SSLv2 does not offer any other way to know the sid length. 523 """ 524 name = "SSLv2 Handshake - Server Finished" 525 fields_desc = [ByteEnumField("msgtype", 6, _sslv2_handshake_type), 526 XStrField("sid", "")] 527 528 def build(self, *args, **kargs): 529 fval = self.getfieldval("sid") 530 if fval == b"": 531 self.sid = self.tls_session.sid 532 return super(SSLv2ServerFinished, self).build(*args, **kargs) 533 534 def post_dissection_tls_session_update(self, msg_str): 535 self.tls_session_update(msg_str) 536 self.tls_session.sid = self.sid 537 538 539############################################################################### 540# All handshake messages defined in this module # 541############################################################################### 542 543_sslv2_handshake_cls = {0: SSLv2Error, 1: SSLv2ClientHello, 544 2: SSLv2ClientMasterKey, 3: SSLv2ClientFinished, 545 4: SSLv2ServerHello, 5: SSLv2ServerVerify, 546 6: SSLv2ServerFinished, 7: SSLv2RequestCertificate, 547 8: SSLv2ClientCertificate} 548