1# This file is part of Scapy
2# Copyright (C) 2017 Maxence Tury
3# This program is published under a GPLv2 license
4
5"""
6SSLv2 handshake fields & logic.
7"""
8
9import struct
10
11from scapy.error import log_runtime, warning
12from scapy.utils import randstring
13from scapy.fields import ByteEnumField, ByteField, EnumField, FieldLenField, \
14    ShortEnumField, StrLenField, XStrField, XStrLenField
15
16from scapy.packet import Padding
17from scapy.layers.tls.cert import Cert
18from scapy.layers.tls.basefields import _tls_version, _TLSVersionField
19from scapy.layers.tls.handshake import _CipherSuitesField
20from scapy.layers.tls.keyexchange import _TLSSignatureField, _TLSSignature
21from scapy.layers.tls.session import (_GenericTLSSessionInheritance,
22                                      readConnState, writeConnState)
23from scapy.layers.tls.crypto.suites import (_tls_cipher_suites,
24                                            _tls_cipher_suites_cls,
25                                            get_usable_ciphersuites,
26                                            SSL_CK_DES_192_EDE3_CBC_WITH_MD5)
27
28
29###############################################################################
30#   Generic SSLv2 Handshake message                                           #
31###############################################################################
32
33_sslv2_handshake_type = {0: "error", 1: "client_hello",
34                         2: "client_master_key", 3: "client_finished",
35                         4: "server_hello", 5: "server_verify",
36                         6: "server_finished", 7: "request_certificate",
37                         8: "client_certificate"}
38
39
40class _SSLv2Handshake(_GenericTLSSessionInheritance):
41    """
42    Inherited by other Handshake classes to get post_build().
43    Also used as a fallback for unknown TLS Handshake packets.
44    """
45    name = "SSLv2 Handshake Generic message"
46    fields_desc = [ByteEnumField("msgtype", None, _sslv2_handshake_type)]
47
48    def guess_payload_class(self, p):
49        return Padding
50
51    def tls_session_update(self, msg_str):
52        """
53        Covers both post_build- and post_dissection- context updates.
54        """
55        self.tls_session.handshake_messages.append(msg_str)
56        self.tls_session.handshake_messages_parsed.append(self)
57
58
59###############################################################################
60#   Error                                                                     #
61###############################################################################
62
63_tls_error_code = {1: "no_cipher", 2: "no_certificate",
64                   4: "bad_certificate", 6: "unsupported_certificate_type"}
65
66
67class SSLv2Error(_SSLv2Handshake):
68    """
69    SSLv2 Error.
70    """
71    name = "SSLv2 Handshake - Error"
72    fields_desc = [ByteEnumField("msgtype", 0, _sslv2_handshake_type),
73                   ShortEnumField("code", None, _tls_error_code)]
74
75
76###############################################################################
77#   ClientHello                                                               #
78###############################################################################
79
80class _SSLv2CipherSuitesField(_CipherSuitesField):
81    def __init__(self, name, default, dico, length_from=None):
82        _CipherSuitesField.__init__(self, name, default, dico,
83                                    length_from=length_from)
84        self.itemfmt = b""
85        self.itemsize = 3
86
87    def i2m(self, pkt, val):
88        if val is None:
89            val2 = []
90        val2 = [(x >> 16, x & 0x00ffff) for x in val]
91        return b"".join([struct.pack(">BH", x[0], x[1]) for x in val2])
92
93    def m2i(self, pkt, m):
94        res = []
95        while m:
96            res.append(struct.unpack("!I", b"\x00" + m[:3])[0])
97            m = m[3:]
98        return res
99
100
101class SSLv2ClientHello(_SSLv2Handshake):
102    """
103    SSLv2 ClientHello.
104    """
105    name = "SSLv2 Handshake - Client Hello"
106    fields_desc = [ByteEnumField("msgtype", 1, _sslv2_handshake_type),
107                   _TLSVersionField("version", 0x0002, _tls_version),
108
109                   FieldLenField("cipherslen", None, fmt="!H",
110                                 length_of="ciphers"),
111                   FieldLenField("sidlen", None, fmt="!H",
112                                 length_of="sid"),
113                   FieldLenField("challengelen", None, fmt="!H",
114                                 length_of="challenge"),
115
116                   XStrLenField("sid", b"",
117                                length_from=lambda pkt:pkt.sidlen),
118                   _SSLv2CipherSuitesField("ciphers",
119                                           [SSL_CK_DES_192_EDE3_CBC_WITH_MD5],
120                                           _tls_cipher_suites,
121                                           length_from=lambda pkt: pkt.cipherslen),  # noqa: E501
122                   XStrLenField("challenge", b"",
123                                length_from=lambda pkt:pkt.challengelen)]
124
125    def tls_session_update(self, msg_str):
126        super(SSLv2ClientHello, self).tls_session_update(msg_str)
127        self.tls_session.advertised_tls_version = self.version
128        self.tls_session.sslv2_common_cs = self.ciphers
129        self.tls_session.sslv2_challenge = self.challenge
130
131
132###############################################################################
133#   ServerHello                                                               #
134###############################################################################
135
136class _SSLv2CertDataField(StrLenField):
137    def getfield(self, pkt, s):
138        tmp_len = 0
139        if self.length_from is not None:
140            tmp_len = self.length_from(pkt)
141        try:
142            certdata = Cert(s[:tmp_len])
143        except Exception:
144            # Packets are sometimes wrongly interpreted as SSLv2
145            # (see record.py). We ignore failures silently
146            certdata = s[:tmp_len]
147        return s[tmp_len:], certdata
148
149    def i2len(self, pkt, i):
150        if isinstance(i, Cert):
151            return len(i.der)
152        return len(i)
153
154    def i2m(self, pkt, i):
155        if isinstance(i, Cert):
156            return i.der
157        return i
158
159
160class SSLv2ServerHello(_SSLv2Handshake):
161    """
162    SSLv2 ServerHello.
163    """
164    name = "SSLv2 Handshake - Server Hello"
165    fields_desc = [ByteEnumField("msgtype", 4, _sslv2_handshake_type),
166
167                   ByteField("sid_hit", 0),
168                   ByteEnumField("certtype", 1, {1: "x509_cert"}),
169                   _TLSVersionField("version", 0x0002, _tls_version),
170
171                   FieldLenField("certlen", None, fmt="!H",
172                                 length_of="cert"),
173                   FieldLenField("cipherslen", None, fmt="!H",
174                                 length_of="ciphers"),
175                   FieldLenField("connection_idlen", None, fmt="!H",
176                                 length_of="connection_id"),
177
178                   _SSLv2CertDataField("cert", b"",
179                                       length_from=lambda pkt: pkt.certlen),
180                   _SSLv2CipherSuitesField("ciphers", [], _tls_cipher_suites,
181                                           length_from=lambda pkt: pkt.cipherslen),  # noqa: E501
182                   XStrLenField("connection_id", b"",
183                                length_from=lambda pkt: pkt.connection_idlen)]
184
185    def tls_session_update(self, msg_str):
186        """
187        XXX Something should be done about the session ID here.
188        """
189        super(SSLv2ServerHello, self).tls_session_update(msg_str)
190
191        s = self.tls_session
192        client_cs = s.sslv2_common_cs
193        css = [cs for cs in client_cs if cs in self.ciphers]
194        s.sslv2_common_cs = css
195        s.sslv2_connection_id = self.connection_id
196        s.tls_version = self.version
197        if self.cert is not None:
198            s.server_certs = [self.cert]
199
200
201###############################################################################
202#   ClientMasterKey                                                           #
203###############################################################################
204
205class _SSLv2CipherSuiteField(EnumField):
206    def __init__(self, name, default, dico):
207        EnumField.__init__(self, name, default, dico)
208
209    def i2m(self, pkt, val):
210        if val is None:
211            return b""
212        val2 = (val >> 16, val & 0x00ffff)
213        return struct.pack(">BH", val2[0], val2[1])
214
215    def addfield(self, pkt, s, val):
216        return s + self.i2m(pkt, val)
217
218    def m2i(self, pkt, m):
219        return struct.unpack("!I", b"\x00" + m[:3])[0]
220
221    def getfield(self, pkt, s):
222        return s[3:], self.m2i(pkt, s)
223
224
225class _SSLv2EncryptedKeyField(XStrLenField):
226    def i2repr(self, pkt, x):
227        s = super(_SSLv2EncryptedKeyField, self).i2repr(pkt, x)
228        if pkt.decryptedkey is not None:
229            dx = pkt.decryptedkey
230            ds = super(_SSLv2EncryptedKeyField, self).i2repr(pkt, dx)
231            s += "    [decryptedkey= %s]" % ds
232        return s
233
234
235class SSLv2ClientMasterKey(_SSLv2Handshake):
236    """
237    SSLv2 ClientMasterKey.
238    """
239    __slots__ = ["decryptedkey"]
240    name = "SSLv2 Handshake - Client Master Key"
241    fields_desc = [ByteEnumField("msgtype", 2, _sslv2_handshake_type),
242                   _SSLv2CipherSuiteField("cipher", None, _tls_cipher_suites),
243
244                   FieldLenField("clearkeylen", None, fmt="!H",
245                                 length_of="clearkey"),
246                   FieldLenField("encryptedkeylen", None, fmt="!H",
247                                 length_of="encryptedkey"),
248                   FieldLenField("keyarglen", None, fmt="!H",
249                                 length_of="keyarg"),
250
251                   XStrLenField("clearkey", "",
252                                length_from=lambda pkt: pkt.clearkeylen),
253                   _SSLv2EncryptedKeyField("encryptedkey", "",
254                                           length_from=lambda pkt: pkt.encryptedkeylen),  # noqa: E501
255                   XStrLenField("keyarg", "",
256                                length_from=lambda pkt: pkt.keyarglen)]
257
258    def __init__(self, *args, **kargs):
259        """
260        When post_building, the packets fields are updated (this is somewhat
261        non-standard). We might need these fields later, but calling __str__
262        on a new packet (i.e. not dissected from a raw string) applies
263        post_build to an object different from the original one... unless
264        we hackishly always set self.explicit to 1.
265        """
266        self.decryptedkey = kargs.pop("decryptedkey", b"")
267        super(SSLv2ClientMasterKey, self).__init__(*args, **kargs)
268        self.explicit = 1
269
270    def pre_dissect(self, s):
271        clearkeylen = struct.unpack("!H", s[4:6])[0]
272        encryptedkeylen = struct.unpack("!H", s[6:8])[0]
273        encryptedkeystart = 10 + clearkeylen
274        encryptedkey = s[encryptedkeystart:encryptedkeystart + encryptedkeylen]
275        if self.tls_session.server_rsa_key:
276            self.decryptedkey = \
277                self.tls_session.server_rsa_key.decrypt(encryptedkey)
278        else:
279            self.decryptedkey = None
280        return s
281
282    def post_build(self, pkt, pay):
283        cs_val = None
284        if self.cipher is None:
285            common_cs = self.tls_session.sslv2_common_cs
286            cs_vals = get_usable_ciphersuites(common_cs, "SSLv2")
287            if len(cs_vals) == 0:
288                warning("No known common cipher suite between SSLv2 Hellos.")
289                cs_val = 0x0700c0
290                cipher = b"\x07\x00\xc0"
291            else:
292                cs_val = cs_vals[0]  # XXX choose the best one
293                cipher = struct.pack(">BH", cs_val >> 16, cs_val & 0x00ffff)
294            cs_cls = _tls_cipher_suites_cls[cs_val]
295            self.cipher = cs_val
296        else:
297            cipher = pkt[1:4]
298            cs_val = struct.unpack("!I", b"\x00" + cipher)[0]
299            if cs_val not in _tls_cipher_suites_cls:
300                warning("Unknown cipher suite %d from ClientMasterKey", cs_val)
301                cs_cls = None
302            else:
303                cs_cls = _tls_cipher_suites_cls[cs_val]
304
305        if cs_cls:
306            if (self.encryptedkey == b"" and
307                    len(self.tls_session.server_certs) > 0):
308                # else, the user is responsible for export slicing & encryption
309                key = randstring(cs_cls.cipher_alg.key_len)
310
311                if self.clearkey == b"" and cs_cls.kx_alg.export:
312                    self.clearkey = key[:-5]
313
314                if self.decryptedkey == b"":
315                    if cs_cls.kx_alg.export:
316                        self.decryptedkey = key[-5:]
317                    else:
318                        self.decryptedkey = key
319
320                pubkey = self.tls_session.server_certs[0].pubKey
321                self.encryptedkey = pubkey.encrypt(self.decryptedkey)
322
323            if self.keyarg == b"" and cs_cls.cipher_alg.type == "block":
324                self.keyarg = randstring(cs_cls.cipher_alg.block_size)
325
326        clearkey = self.clearkey or b""
327        if self.clearkeylen is None:
328            self.clearkeylen = len(clearkey)
329        clearkeylen = struct.pack("!H", self.clearkeylen)
330
331        encryptedkey = self.encryptedkey or b""
332        if self.encryptedkeylen is None:
333            self.encryptedkeylen = len(encryptedkey)
334        encryptedkeylen = struct.pack("!H", self.encryptedkeylen)
335
336        keyarg = self.keyarg or b""
337        if self.keyarglen is None:
338            self.keyarglen = len(keyarg)
339        keyarglen = struct.pack("!H", self.keyarglen)
340
341        s = (pkt[:1] + cipher +
342             clearkeylen + encryptedkeylen + keyarglen +
343             clearkey + encryptedkey + keyarg)
344        return s + pay
345
346    def tls_session_update(self, msg_str):
347        super(SSLv2ClientMasterKey, self).tls_session_update(msg_str)
348
349        s = self.tls_session
350        cs_val = self.cipher
351        if cs_val not in _tls_cipher_suites_cls:
352            warning("Unknown cipher suite %d from ClientMasterKey", cs_val)
353            cs_cls = None
354        else:
355            cs_cls = _tls_cipher_suites_cls[cs_val]
356
357        tls_version = s.tls_version or 0x0002
358        connection_end = s.connection_end
359        wcs_seq_num = s.wcs.seq_num
360        s.pwcs = writeConnState(ciphersuite=cs_cls,
361                                connection_end=connection_end,
362                                seq_num=wcs_seq_num,
363                                tls_version=tls_version)
364        rcs_seq_num = s.rcs.seq_num
365        s.prcs = readConnState(ciphersuite=cs_cls,
366                               connection_end=connection_end,
367                               seq_num=rcs_seq_num,
368                               tls_version=tls_version)
369
370        if self.decryptedkey is not None:
371            s.master_secret = self.clearkey + self.decryptedkey
372            s.compute_sslv2_km_and_derive_keys()
373
374            if s.pwcs.cipher.type == "block":
375                s.pwcs.cipher.iv = self.keyarg
376            if s.prcs.cipher.type == "block":
377                s.prcs.cipher.iv = self.keyarg
378
379            s.triggered_prcs_commit = True
380            s.triggered_pwcs_commit = True
381
382
383###############################################################################
384#   ServerVerify                                                              #
385###############################################################################
386
387class SSLv2ServerVerify(_SSLv2Handshake):
388    """
389    In order to parse a ServerVerify, the exact message string should be
390    fed to the class. This is how SSLv2 defines the challenge length...
391    """
392    name = "SSLv2 Handshake - Server Verify"
393    fields_desc = [ByteEnumField("msgtype", 5, _sslv2_handshake_type),
394                   XStrField("challenge", "")]
395
396    def build(self, *args, **kargs):
397        fval = self.getfieldval("challenge")
398        if fval is None:
399            self.challenge = self.tls_session.sslv2_challenge
400        return super(SSLv2ServerVerify, self).build(*args, **kargs)
401
402    def post_dissection(self, pkt):
403        s = self.tls_session
404        if s.sslv2_challenge is not None:
405            if self.challenge != s.sslv2_challenge:
406                pkt_info = pkt.firstlayer().summary()
407                log_runtime.info("TLS: invalid ServerVerify received [%s]", pkt_info)  # noqa: E501
408
409
410###############################################################################
411#   RequestCertificate                                                        #
412###############################################################################
413
414class SSLv2RequestCertificate(_SSLv2Handshake):
415    """
416    In order to parse a RequestCertificate, the exact message string should be
417    fed to the class. This is how SSLv2 defines the challenge length...
418    """
419    name = "SSLv2 Handshake - Request Certificate"
420    fields_desc = [ByteEnumField("msgtype", 7, _sslv2_handshake_type),
421                   ByteEnumField("authtype", 1, {1: "md5_with_rsa"}),
422                   XStrField("challenge", "")]
423
424    def tls_session_update(self, msg_str):
425        super(SSLv2RequestCertificate, self).tls_session_update(msg_str)
426        self.tls_session.sslv2_challenge_clientcert = self.challenge
427
428
429###############################################################################
430#   ClientCertificate                                                         #
431###############################################################################
432
433class SSLv2ClientCertificate(_SSLv2Handshake):
434    """
435    SSLv2 ClientCertificate.
436    """
437    name = "SSLv2 Handshake - Client Certificate"
438    fields_desc = [ByteEnumField("msgtype", 8, _sslv2_handshake_type),
439
440                   ByteEnumField("certtype", 1, {1: "x509_cert"}),
441                   FieldLenField("certlen", None, fmt="!H",
442                                 length_of="certdata"),
443                   FieldLenField("responselen", None, fmt="!H",
444                                 length_of="responsedata"),
445
446                   _SSLv2CertDataField("certdata", b"",
447                                       length_from=lambda pkt: pkt.certlen),
448                   _TLSSignatureField("responsedata", None,
449                                      length_from=lambda pkt: pkt.responselen)]
450
451    def build(self, *args, **kargs):
452        s = self.tls_session
453        sig = self.getfieldval("responsedata")
454        test = (sig is None and
455                s.sslv2_key_material is not None and
456                s.sslv2_challenge_clientcert is not None and
457                len(s.server_certs) > 0)
458        if test:
459            s = self.tls_session
460            m = (s.sslv2_key_material +
461                 s.sslv2_challenge_clientcert +
462                 s.server_certs[0].der)
463            self.responsedata = _TLSSignature(tls_session=s)
464            self.responsedata._update_sig(m, s.client_key)
465        else:
466            self.responsedata = b""
467        return super(SSLv2ClientCertificate, self).build(*args, **kargs)
468
469    def post_dissection_tls_session_update(self, msg_str):
470        self.tls_session_update(msg_str)
471
472        s = self.tls_session
473        test = (len(s.client_certs) > 0 and
474                s.sslv2_key_material is not None and
475                s.sslv2_challenge_clientcert is not None and
476                len(s.server_certs) > 0)
477        if test:
478            m = (s.sslv2_key_material +
479                 s.sslv2_challenge_clientcert +
480                 s.server_certs[0].der)
481            sig_test = self.responsedata._verify_sig(m, s.client_certs[0])
482            if not sig_test:
483                pkt_info = self.firstlayer().summary()
484                log_runtime.info("TLS: invalid client CertificateVerify signature [%s]", pkt_info)  # noqa: E501
485
486    def tls_session_update(self, msg_str):
487        super(SSLv2ClientCertificate, self).tls_session_update(msg_str)
488        if self.certdata:
489            self.tls_session.client_certs = [self.certdata]
490
491
492###############################################################################
493#   Finished                                                                  #
494###############################################################################
495
496class SSLv2ClientFinished(_SSLv2Handshake):
497    """
498    In order to parse a ClientFinished, the exact message string should be fed
499    to the class. SSLv2 does not offer any other way to know the c_id length.
500    """
501    name = "SSLv2 Handshake - Client Finished"
502    fields_desc = [ByteEnumField("msgtype", 3, _sslv2_handshake_type),
503                   XStrField("connection_id", "")]
504
505    def build(self, *args, **kargs):
506        fval = self.getfieldval("connection_id")
507        if fval == b"":
508            self.connection_id = self.tls_session.sslv2_connection_id
509        return super(SSLv2ClientFinished, self).build(*args, **kargs)
510
511    def post_dissection(self, pkt):
512        s = self.tls_session
513        if s.sslv2_connection_id is not None:
514            if self.connection_id != s.sslv2_connection_id:
515                pkt_info = pkt.firstlayer().summary()
516                log_runtime.info("TLS: invalid client Finished received [%s]", pkt_info)  # noqa: E501
517
518
519class SSLv2ServerFinished(_SSLv2Handshake):
520    """
521    In order to parse a ServerFinished, the exact message string should be fed
522    to the class. SSLv2 does not offer any other way to know the sid length.
523    """
524    name = "SSLv2 Handshake - Server Finished"
525    fields_desc = [ByteEnumField("msgtype", 6, _sslv2_handshake_type),
526                   XStrField("sid", "")]
527
528    def build(self, *args, **kargs):
529        fval = self.getfieldval("sid")
530        if fval == b"":
531            self.sid = self.tls_session.sid
532        return super(SSLv2ServerFinished, self).build(*args, **kargs)
533
534    def post_dissection_tls_session_update(self, msg_str):
535        self.tls_session_update(msg_str)
536        self.tls_session.sid = self.sid
537
538
539###############################################################################
540#   All handshake messages defined in this module                             #
541###############################################################################
542
543_sslv2_handshake_cls = {0: SSLv2Error, 1: SSLv2ClientHello,
544                        2: SSLv2ClientMasterKey, 3: SSLv2ClientFinished,
545                        4: SSLv2ServerHello, 5: SSLv2ServerVerify,
546                        6: SSLv2ServerFinished, 7: SSLv2RequestCertificate,
547                        8: SSLv2ClientCertificate}
548