1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3# PYTHON_ARGCOMPLETE_OK
4
5# Copyright: (c) 2020 Jordan Borean (@jborean93) <jborean93@gmail.com>
6# MIT License (see LICENSE or https://opensource.org/licenses/MIT)
7
8"""
9Script that can be used to parse a Negotiate token and output a human readable structure. You can pass in an actual
10SPNEGO token or just a raw Kerberos or NTLM token, the script should be smart enough to detect the structure of the
11input.
12"""
13
14import argparse
15import base64
16import json
17import os.path
18import re
19import struct
20import sys
21import typing
22
23from spnego._context import GSSMech
24from spnego._kerberos import (
25    KerberosV5Msg,
26    parse_enum,
27    parse_flags,
28    parse_kerberos_token,
29)
30from spnego._ntlm_raw.crypto import hmac_md5, ntowfv1, ntowfv2, rc4k
31from spnego._ntlm_raw.messages import (
32    Authenticate,
33    AvId,
34    Challenge,
35    Negotiate,
36    NegotiateFlags,
37    NTClientChallengeV2,
38    TargetInfo,
39    Version,
40)
41from spnego._spnego import InitialContextToken, NegTokenInit, NegTokenResp, unpack_token
42from spnego._text import to_bytes
43
44try:
45    import argcomplete
46except ImportError:  # pragma: nocover
47    argcomplete = None
48
49yaml: typing.Optional[typing.Any]
50try:
51    from ruamel import yaml
52except ImportError:  # pragma: nocover
53    yaml = None
54
55
56def _parse_ntlm_version(
57    version: typing.Optional[Version],
58) -> typing.Optional[typing.Dict[str, typing.Union[int, str]]]:
59    if not version:
60        return None
61
62    return {
63        'Major': version.major,
64        'Minor': version.minor,
65        'Build': version.build,
66        'Reserved': base64.b16encode(version.reserved).decode(),
67        'NTLMRevision': version.revision,
68    }
69
70
71def _parse_ntlm_target_info(
72    target_info: typing.Optional[TargetInfo],
73) -> typing.Optional[typing.List[typing.Dict[str, typing.Any]]]:
74    if target_info is None:
75        return None
76
77    text_values = [AvId.nb_computer_name, AvId.nb_domain_name, AvId.dns_computer_name, AvId.dns_domain_name,
78                   AvId.dns_tree_name, AvId.target_name]
79
80    info = []
81    for av_id, raw_value in target_info.items():
82
83        if av_id == AvId.eol:
84            value = None
85        elif av_id in text_values:
86            value = raw_value
87        elif av_id == AvId.flags:
88            value = parse_flags(raw_value)
89        elif av_id == AvId.timestamp:
90            value = str(raw_value)
91        elif av_id == AvId.single_host:
92            value = {
93                'Size': raw_value.size,
94                'Z4': raw_value.z4,
95                'CustomData': base64.b16encode(raw_value.custom_data).decode(),
96                'MachineId': base64.b16encode(raw_value.machine_id).decode(),
97            }
98        else:
99            value = base64.b16encode(raw_value).decode()
100
101        info.append({'AvId': parse_enum(av_id), 'Value': value})
102
103    return info
104
105
106def _parse_ntlm_negotiate(data: Negotiate) -> typing.Dict[str, typing.Any]:
107    b_data = data.pack()
108
109    msg = {
110        'NegotiateFlags': parse_flags(data.flags, enum_type=NegotiateFlags),
111        'DomainNameFields': {
112            'Len': struct.unpack("<H", b_data[16:18])[0],
113            'MaxLen': struct.unpack("<H", b_data[18:20])[0],
114            'BufferOffset': struct.unpack("<I", b_data[20:24])[0],
115        },
116        'WorkstationFields': {
117            'Len': struct.unpack("<H", b_data[24:26])[0],
118            'MaxLen': struct.unpack("<H", b_data[26:28])[0],
119            'BufferOffset': struct.unpack("<I", b_data[28:32])[0],
120        },
121        'Version': _parse_ntlm_version(data.version),
122        'Payload': {
123            'DomainName': data.domain_name,
124            'Workstation': data.workstation,
125        }
126    }
127
128    return msg
129
130
131def _parse_ntlm_challenge(data: Challenge) -> typing.Dict[str, typing.Any]:
132    b_data = data.pack()
133
134    msg = {
135        'TargetNameFields': {
136            'Len': struct.unpack("<H", b_data[12:14])[0],
137            'MaxLen': struct.unpack("<H", b_data[14:16])[0],
138            'BufferOffset': struct.unpack("<I", b_data[16:20])[0],
139        },
140        'NegotiateFlags': parse_flags(data.flags, enum_type=NegotiateFlags),
141        'ServerChallenge': base64.b16encode(b_data[24:32]).decode(),
142        'Reserved': base64.b16encode(b_data[32:40]).decode(),
143        'TargetInfoFields': {
144            'Len': struct.unpack("<H", b_data[40:42])[0],
145            'MaxLen': struct.unpack("<H", b_data[42:44])[0],
146            'BufferOffset': struct.unpack("<I", b_data[44:48])[0],
147        },
148        'Version': _parse_ntlm_version(data.version),
149        'Payload': {
150            'TargetName': data.target_name,
151            'TargetInfo': _parse_ntlm_target_info(data.target_info),
152        },
153    }
154
155    return msg
156
157
158def _parse_ntlm_authenticate(data: Authenticate, password: typing.Optional[str]) -> typing.Dict[str, typing.Any]:
159    b_data = data.pack()
160
161    msg: typing.Dict[str, typing.Any] = {
162        'LmChallengeResponseFields': {
163            'Len': struct.unpack("<H", b_data[12:14])[0],
164            'MaxLen': struct.unpack("<H", b_data[14:16])[0],
165            'BufferOffset': struct.unpack("<I", b_data[16:20])[0],
166        },
167        'NtChallengeResponseFields': {
168            'Len': struct.unpack("<H", b_data[20:22])[0],
169            'MaxLen': struct.unpack("<H", b_data[22:24])[0],
170            'BufferOffset': struct.unpack("<I", b_data[24:28])[0],
171        },
172        'DomainNameFields': {
173            'Len': struct.unpack("<H", b_data[28:30])[0],
174            'MaxLen': struct.unpack("<H", b_data[30:32])[0],
175            'BufferOffset': struct.unpack("<I", b_data[32:36])[0],
176        },
177        'UserNameFields': {
178            'Len': struct.unpack("<H", b_data[36:38])[0],
179            'MaxLen': struct.unpack("<H", b_data[38:40])[0],
180            'BufferOffset': struct.unpack("<I", b_data[40:44])[0],
181        },
182        'WorkstationFields': {
183            'Len': struct.unpack("<H", b_data[44:46])[0],
184            'MaxLen': struct.unpack("<H", b_data[46:48])[0],
185            'BufferOffset': struct.unpack("<I", b_data[48:52])[0],
186        },
187        'EncryptedRandomSessionKeyFields': {
188            'Len': struct.unpack("<H", b_data[52:54])[0],
189            'MaxLen': struct.unpack("<H", b_data[54:56])[0],
190            'BufferOffset': struct.unpack("<I", b_data[56:60])[0],
191        },
192        'NegotiateFlags': parse_flags(data.flags, enum_type=NegotiateFlags),
193        'Version': _parse_ntlm_version(data.version),
194        'MIC': base64.b16encode(data.mic).decode() if data.mic else None,
195        'Payload': {
196            'LmChallengeResponse': None,
197            'NtChallengeResponse': None,
198            'DomainName': data.domain_name,
199            'UserName': data.user_name,
200            'Workstation': data.workstation,
201            'EncryptedRandomSessionKey': None,
202        },
203    }
204
205    key_exchange_key = None
206    lm_response_data = data.lm_challenge_response
207    nt_response_data = data.nt_challenge_response
208
209    if lm_response_data:
210        lm_response: typing.Dict[str, typing.Any] = {
211            'ResponseType': None,
212            'LMProofStr': None,
213        }
214
215        if not nt_response_data or len(nt_response_data) == 24:
216            lm_response['ResponseType'] = 'LMv1'
217            lm_response['LMProofStr'] = base64.b16encode(lm_response_data).decode()
218
219        else:
220            lm_response['ResponseType'] = 'LMv2'
221            lm_response['LMProofStr'] = base64.b16encode(lm_response_data[:16]).decode()
222            lm_response['ChallengeFromClient'] = base64.b16encode(lm_response_data[16:]).decode()
223
224        msg['Payload']['LmChallengeResponse'] = lm_response
225
226    if nt_response_data:
227        nt_response: typing.Dict[str, typing.Any] = {
228            'ResponseType': None,
229            'NTProofStr': None,
230        }
231
232        if len(nt_response_data) == 24:
233            nt_response['ResponseType'] = 'NTLMv1'
234            nt_response['NTProofStr'] = base64.b16encode(nt_response_data).decode()
235
236            # TODO: need to get a sane way to include the server challenge for ESS KXKEY.
237            # if password and lm_response_data:
238            #     session_base_key = hashlib.new('md4', ntowfv1(password)).digest()
239            #     lmowf = lmowfv1(password)
240            #     if data.flags & NegotiateFlags.extended_session_security == 0:
241            #         key_exchange_key = kxkey(data.flags, session_base_key, lmowf, lm_response_data, b"")
242
243        else:
244            nt_proof_str = nt_response_data[:16]
245            nt_response['ResponseType'] = 'NTLMv2'
246            nt_response['NTProofStr'] = base64.b16encode(nt_proof_str).decode()
247
248            challenge = NTClientChallengeV2.unpack(nt_response_data[16:])
249            b_challenge = nt_response_data[16:]
250
251            nt_response['ClientChallenge'] = {
252                'RespType': challenge.resp_type,
253                'HiRespType': challenge.hi_resp_type,
254                'Reserved1': struct.unpack("<H", b_challenge[2:4])[0],
255                'Reserved2': struct.unpack("<I", b_challenge[4:8])[0],
256                'TimeStamp': str(challenge.time_stamp),
257                'ChallengeFromClient': base64.b16encode(challenge.challenge_from_client).decode(),
258                'Reserved3': struct.unpack("<I", b_challenge[24:28])[0],
259                'AvPairs': _parse_ntlm_target_info(challenge.av_pairs),
260                'Reserved4': struct.unpack("<I", b_challenge[-4:])[0],
261            }
262
263            if password:
264                response_key_nt = ntowfv2(msg['Payload']['UserName'], ntowfv1(password), msg['Payload']['DomainName'])
265                key_exchange_key = hmac_md5(response_key_nt, nt_proof_str)
266
267        msg['Payload']['NtChallengeResponse'] = nt_response
268
269    if data.encrypted_random_session_key:
270        msg['Payload']['EncryptedRandomSessionKey'] = base64.b16encode(data.encrypted_random_session_key).decode()
271
272    if data.flags & NegotiateFlags.key_exch and (data.flags & NegotiateFlags.sign or data.flags & NegotiateFlags.seal):
273        session_key = None
274        if key_exchange_key:
275            session_key = rc4k(key_exchange_key, typing.cast(bytes, data.encrypted_random_session_key))
276
277    else:
278        session_key = key_exchange_key
279
280    msg['SessionKey'] = base64.b16encode(session_key).decode() if session_key else 'Failed to derive'
281
282    return msg
283
284
285def _parse_spnego_init(
286    data: NegTokenInit,
287    secret: typing.Optional[str] = None,
288    encoding: typing.Optional[str] = None,
289) -> typing.Dict[str, typing.Any]:
290    mech_types = [parse_enum(m, enum_type=GSSMech) for m in data.mech_types] \
291        if data.mech_types else None
292
293    mech_token = None
294    if data.mech_token:
295        mech_token = parse_token(data.mech_token, secret=secret, encoding=encoding)
296
297    encoding = encoding or 'utf-8'
298
299    msg = {
300        'mechTypes': mech_types,
301        'reqFlags': parse_flags(data.req_flags) if data.req_flags is not None else None,
302        'mechToken': mech_token,
303        'mechListMIC': base64.b16encode(data.mech_list_mic).decode() if data.mech_list_mic is not None else None,
304    }
305
306    if data.hint_name or data.hint_address:
307        # This is a NegTokenInit2 structure.
308        msg['negHints'] = {
309            'hintName': data.hint_name.decode(encoding) if data.hint_name else None,
310            'hintAddress': data.hint_address.decode(encoding) if data.hint_address else None,
311        }
312
313    return msg
314
315
316def _parse_spnego_resp(
317    data: NegTokenResp,
318    secret: typing.Optional[str] = None,
319    encoding: typing.Optional[str] = None,
320) -> typing.Dict[str, typing.Any]:
321    supported_mech = parse_enum(data.supported_mech, enum_type=GSSMech) if data.supported_mech else None
322
323    response_token = None
324    if data.response_token:
325        response_token = parse_token(data.response_token, secret=secret, encoding=encoding)
326
327    msg = {
328        'negState': parse_enum(data.neg_state) if data.neg_state is not None else None,
329        'supportedMech': supported_mech,
330        'responseToken': response_token,
331        'mechListMIC': base64.b16encode(data.mech_list_mic).decode() if data.mech_list_mic is not None else None,
332    }
333    return msg
334
335
336def main(args: typing.List[str]) -> None:
337    """Main program entry point."""
338    parsed_args = parse_args(args)
339
340    if parsed_args.token:
341        b_data = to_bytes(parsed_args.token)
342    else:
343        if parsed_args.file:
344            file_path = os.path.abspath(os.path.expanduser(os.path.expandvars(parsed_args.file)))
345            b_file_path = to_bytes(file_path)
346            if not os.path.exists(b_file_path):
347                raise ValueError("Cannot find file at path '%s'" % file_path)
348
349            with open(b_file_path, mode='rb') as fd:
350                b_data = fd.read()
351        else:
352            b_data = sys.stdin.buffer.read()
353
354    if re.match(b'^[a-fA-F0-9\\s]+$', b_data):
355        # Input data was a hex string.
356        b_data = base64.b16decode(re.sub(b'[\\s]', b'', b_data.strip().upper()))
357    if re.match(b'^([A-Za-z0-9+/]{4})*([A-Za-z0-9+/]{3}=|[A-Za-z0-9+/]{2}==)?$', b_data):
358        # Input data was a base64 string.
359        b_data = base64.b64decode(b_data.strip())
360
361    token_info = parse_token(b_data, secret=parsed_args.secret, encoding=parsed_args.encoding)
362
363    if parsed_args.output_format == 'yaml':
364        y = yaml.YAML()  # type: ignore
365        y.default_flow_style = False
366        y.dump(token_info, sys.stdout)
367    else:
368        print(json.dumps(token_info, indent=4))
369
370
371def parse_args(args: typing.List[str]) -> argparse.Namespace:
372    """Parse and return args."""
373    parser = argparse.ArgumentParser(description='Parse Microsoft authentication tokens into a human readable format.')
374
375    data = parser.add_mutually_exclusive_group()
376
377    data.add_argument('-t', '--token',
378                      dest='token',
379                      help='Raw base64 encoded or hex string token as a command line argument.')
380
381    data.add_argument('-f', '--file',
382                      default='',
383                      dest='file',
384                      help='Path to file that contains raw bytes, base64, or hex string of token to parse, Defaults '
385                           'to reading from stdin if neither -t or -f is specified.')
386
387    parser.add_argument('--encoding',
388                        dest='encoding',
389                        help="The encoding to use when trying to decode text fields from bytes in tokens that don't "
390                             "have a negotiated encoding. This defaults to 'windows-1252' for NTLM tokens and 'utf-8' "
391                             "for Kerberos/SPNEGO tokens.")
392
393    parser.add_argument('--format', '--output-format',
394                        choices=['json', 'yaml'],
395                        default='json',
396                        dest='output_format',
397                        type=lambda s: s.lower(),
398                        help='Set the output format of the token, default is (json). Using yaml requires the '
399                             'ruamel.yaml Python library to be installed ''pip install pyspnego[yaml]''.')
400
401    parser.add_argument('--secret', '--password',
402                        dest='secret',
403                        default=None,
404                        help='Optional info that is the secret information for a protocol that can be used to decrypt '
405                             'encrypted fields and/or derive the unique session key in the exchange. This is '
406                             'currently only supported by NTLM tokens to generate the session key.')
407
408    if argcomplete:
409        argcomplete.autocomplete(parser)
410
411    parsed_args = parser.parse_args(args)
412
413    if parsed_args.output_format == 'yaml' and not yaml:
414        raise ValueError('Cannot output as yaml as ruamel.yaml is not installed.')
415
416    return parsed_args
417
418
419def parse_token(
420    b_data: bytes,
421    secret: typing.Optional[str] = None,
422    encoding: typing.Optional[str] = None,
423    mech: typing.Optional[typing.Union[str, GSSMech]] = None
424) -> typing.Dict[str, typing.Any]:
425    """
426    :param b_data: A byte string of the token to parse. This can be a NTLM or GSSAPI (SPNEGO/Kerberos) token.
427    :param secret: The secret data used to decrypt fields and/or derive session keys.
428    :param encoding: The encoding to use for token fields that represent text. This is only used for fields where there
429        is no negotiation for the encoding of that particular field. Defaults to 'windows-1252' for NTLM and 'utf-8'
430        for Kerberos.
431    :return: A dict containing the parsed token data.
432    """
433    gss_mech: typing.Optional[GSSMech] = None
434    if mech and not isinstance(mech, GSSMech):
435        gss_mech = GSSMech.from_oid(mech)
436
437    try:
438        token = unpack_token(b_data, mech=gss_mech, unwrap=True, encoding=encoding)
439    except Exception as e:
440        return {
441            'MessageType': 'Unknown - Failed to parse see Data for more details.',
442            'Data': 'Failed to parse token: %s' % str(e),
443            'RawData': base64.b16encode(b_data).decode(),
444        }
445
446    msg_type = 'Unknown'
447    data: typing.Union[str, typing.Dict[str, typing.Any]] = 'Failed to parse SPNEGO token due to unknown mech type'
448
449    # SPNEGO messages.
450    if isinstance(token, InitialContextToken):
451        msg_type = 'SPNEGO InitialContextToken'
452        data = {
453            'thisMech': parse_enum(token.this_mech, enum_type=GSSMech),
454            'innerContextToken': parse_token(token.inner_context_token, mech=token.this_mech, secret=secret,
455                                             encoding=encoding),
456        }
457
458    elif isinstance(token, NegTokenInit):
459        data = _parse_spnego_init(token, secret, encoding)
460        if 'negHints' in data:
461            msg_type = 'SPNEGO NegTokenInit2'
462
463        else:
464            msg_type = 'SPNEGO NegTokenInit'
465
466    elif isinstance(token, NegTokenResp):
467        msg_type = 'SPNEGO NegTokenResp'
468        data = _parse_spnego_resp(token, secret, encoding)
469
470    # NTLM messages.
471    elif isinstance(token, (Negotiate, Challenge, Authenticate)):
472        msg_type = parse_enum(token.MESSAGE_TYPE)
473
474        if isinstance(token, Negotiate):
475            data = _parse_ntlm_negotiate(token)
476
477        elif isinstance(token, Challenge):
478            data = _parse_ntlm_challenge(token)
479
480        else:
481            data = _parse_ntlm_authenticate(token, secret)
482
483    # Kerberos messages.
484    elif isinstance(token, KerberosV5Msg):
485        msg_type = parse_enum(token.MESSAGE_TYPE)
486        data = parse_kerberos_token(token, secret, encoding)
487
488    return {
489        'MessageType': msg_type,
490        'Data': data,
491        'RawData': base64.b16encode(b_data).decode(),
492    }
493
494
495if __name__ == '__main__':  # pragma: nocover
496    main(sys.argv[1:])
497