1#!/usr/bin/env python 2# -*- coding: utf-8 -*- 3# PYTHON_ARGCOMPLETE_OK 4 5# Copyright: (c) 2020 Jordan Borean (@jborean93) <jborean93@gmail.com> 6# MIT License (see LICENSE or https://opensource.org/licenses/MIT) 7 8""" 9Script that can be used to parse a Negotiate token and output a human readable structure. You can pass in an actual 10SPNEGO token or just a raw Kerberos or NTLM token, the script should be smart enough to detect the structure of the 11input. 12""" 13 14import argparse 15import base64 16import json 17import os.path 18import re 19import struct 20import sys 21import typing 22 23from spnego._context import GSSMech 24from spnego._kerberos import ( 25 KerberosV5Msg, 26 parse_enum, 27 parse_flags, 28 parse_kerberos_token, 29) 30from spnego._ntlm_raw.crypto import hmac_md5, ntowfv1, ntowfv2, rc4k 31from spnego._ntlm_raw.messages import ( 32 Authenticate, 33 AvId, 34 Challenge, 35 Negotiate, 36 NegotiateFlags, 37 NTClientChallengeV2, 38 TargetInfo, 39 Version, 40) 41from spnego._spnego import InitialContextToken, NegTokenInit, NegTokenResp, unpack_token 42from spnego._text import to_bytes 43 44try: 45 import argcomplete 46except ImportError: # pragma: nocover 47 argcomplete = None 48 49yaml: typing.Optional[typing.Any] 50try: 51 from ruamel import yaml 52except ImportError: # pragma: nocover 53 yaml = None 54 55 56def _parse_ntlm_version( 57 version: typing.Optional[Version], 58) -> typing.Optional[typing.Dict[str, typing.Union[int, str]]]: 59 if not version: 60 return None 61 62 return { 63 'Major': version.major, 64 'Minor': version.minor, 65 'Build': version.build, 66 'Reserved': base64.b16encode(version.reserved).decode(), 67 'NTLMRevision': version.revision, 68 } 69 70 71def _parse_ntlm_target_info( 72 target_info: typing.Optional[TargetInfo], 73) -> typing.Optional[typing.List[typing.Dict[str, typing.Any]]]: 74 if target_info is None: 75 return None 76 77 text_values = [AvId.nb_computer_name, AvId.nb_domain_name, AvId.dns_computer_name, AvId.dns_domain_name, 78 AvId.dns_tree_name, AvId.target_name] 79 80 info = [] 81 for av_id, raw_value in target_info.items(): 82 83 if av_id == AvId.eol: 84 value = None 85 elif av_id in text_values: 86 value = raw_value 87 elif av_id == AvId.flags: 88 value = parse_flags(raw_value) 89 elif av_id == AvId.timestamp: 90 value = str(raw_value) 91 elif av_id == AvId.single_host: 92 value = { 93 'Size': raw_value.size, 94 'Z4': raw_value.z4, 95 'CustomData': base64.b16encode(raw_value.custom_data).decode(), 96 'MachineId': base64.b16encode(raw_value.machine_id).decode(), 97 } 98 else: 99 value = base64.b16encode(raw_value).decode() 100 101 info.append({'AvId': parse_enum(av_id), 'Value': value}) 102 103 return info 104 105 106def _parse_ntlm_negotiate(data: Negotiate) -> typing.Dict[str, typing.Any]: 107 b_data = data.pack() 108 109 msg = { 110 'NegotiateFlags': parse_flags(data.flags, enum_type=NegotiateFlags), 111 'DomainNameFields': { 112 'Len': struct.unpack("<H", b_data[16:18])[0], 113 'MaxLen': struct.unpack("<H", b_data[18:20])[0], 114 'BufferOffset': struct.unpack("<I", b_data[20:24])[0], 115 }, 116 'WorkstationFields': { 117 'Len': struct.unpack("<H", b_data[24:26])[0], 118 'MaxLen': struct.unpack("<H", b_data[26:28])[0], 119 'BufferOffset': struct.unpack("<I", b_data[28:32])[0], 120 }, 121 'Version': _parse_ntlm_version(data.version), 122 'Payload': { 123 'DomainName': data.domain_name, 124 'Workstation': data.workstation, 125 } 126 } 127 128 return msg 129 130 131def _parse_ntlm_challenge(data: Challenge) -> typing.Dict[str, typing.Any]: 132 b_data = data.pack() 133 134 msg = { 135 'TargetNameFields': { 136 'Len': struct.unpack("<H", b_data[12:14])[0], 137 'MaxLen': struct.unpack("<H", b_data[14:16])[0], 138 'BufferOffset': struct.unpack("<I", b_data[16:20])[0], 139 }, 140 'NegotiateFlags': parse_flags(data.flags, enum_type=NegotiateFlags), 141 'ServerChallenge': base64.b16encode(b_data[24:32]).decode(), 142 'Reserved': base64.b16encode(b_data[32:40]).decode(), 143 'TargetInfoFields': { 144 'Len': struct.unpack("<H", b_data[40:42])[0], 145 'MaxLen': struct.unpack("<H", b_data[42:44])[0], 146 'BufferOffset': struct.unpack("<I", b_data[44:48])[0], 147 }, 148 'Version': _parse_ntlm_version(data.version), 149 'Payload': { 150 'TargetName': data.target_name, 151 'TargetInfo': _parse_ntlm_target_info(data.target_info), 152 }, 153 } 154 155 return msg 156 157 158def _parse_ntlm_authenticate(data: Authenticate, password: typing.Optional[str]) -> typing.Dict[str, typing.Any]: 159 b_data = data.pack() 160 161 msg: typing.Dict[str, typing.Any] = { 162 'LmChallengeResponseFields': { 163 'Len': struct.unpack("<H", b_data[12:14])[0], 164 'MaxLen': struct.unpack("<H", b_data[14:16])[0], 165 'BufferOffset': struct.unpack("<I", b_data[16:20])[0], 166 }, 167 'NtChallengeResponseFields': { 168 'Len': struct.unpack("<H", b_data[20:22])[0], 169 'MaxLen': struct.unpack("<H", b_data[22:24])[0], 170 'BufferOffset': struct.unpack("<I", b_data[24:28])[0], 171 }, 172 'DomainNameFields': { 173 'Len': struct.unpack("<H", b_data[28:30])[0], 174 'MaxLen': struct.unpack("<H", b_data[30:32])[0], 175 'BufferOffset': struct.unpack("<I", b_data[32:36])[0], 176 }, 177 'UserNameFields': { 178 'Len': struct.unpack("<H", b_data[36:38])[0], 179 'MaxLen': struct.unpack("<H", b_data[38:40])[0], 180 'BufferOffset': struct.unpack("<I", b_data[40:44])[0], 181 }, 182 'WorkstationFields': { 183 'Len': struct.unpack("<H", b_data[44:46])[0], 184 'MaxLen': struct.unpack("<H", b_data[46:48])[0], 185 'BufferOffset': struct.unpack("<I", b_data[48:52])[0], 186 }, 187 'EncryptedRandomSessionKeyFields': { 188 'Len': struct.unpack("<H", b_data[52:54])[0], 189 'MaxLen': struct.unpack("<H", b_data[54:56])[0], 190 'BufferOffset': struct.unpack("<I", b_data[56:60])[0], 191 }, 192 'NegotiateFlags': parse_flags(data.flags, enum_type=NegotiateFlags), 193 'Version': _parse_ntlm_version(data.version), 194 'MIC': base64.b16encode(data.mic).decode() if data.mic else None, 195 'Payload': { 196 'LmChallengeResponse': None, 197 'NtChallengeResponse': None, 198 'DomainName': data.domain_name, 199 'UserName': data.user_name, 200 'Workstation': data.workstation, 201 'EncryptedRandomSessionKey': None, 202 }, 203 } 204 205 key_exchange_key = None 206 lm_response_data = data.lm_challenge_response 207 nt_response_data = data.nt_challenge_response 208 209 if lm_response_data: 210 lm_response: typing.Dict[str, typing.Any] = { 211 'ResponseType': None, 212 'LMProofStr': None, 213 } 214 215 if not nt_response_data or len(nt_response_data) == 24: 216 lm_response['ResponseType'] = 'LMv1' 217 lm_response['LMProofStr'] = base64.b16encode(lm_response_data).decode() 218 219 else: 220 lm_response['ResponseType'] = 'LMv2' 221 lm_response['LMProofStr'] = base64.b16encode(lm_response_data[:16]).decode() 222 lm_response['ChallengeFromClient'] = base64.b16encode(lm_response_data[16:]).decode() 223 224 msg['Payload']['LmChallengeResponse'] = lm_response 225 226 if nt_response_data: 227 nt_response: typing.Dict[str, typing.Any] = { 228 'ResponseType': None, 229 'NTProofStr': None, 230 } 231 232 if len(nt_response_data) == 24: 233 nt_response['ResponseType'] = 'NTLMv1' 234 nt_response['NTProofStr'] = base64.b16encode(nt_response_data).decode() 235 236 # TODO: need to get a sane way to include the server challenge for ESS KXKEY. 237 # if password and lm_response_data: 238 # session_base_key = hashlib.new('md4', ntowfv1(password)).digest() 239 # lmowf = lmowfv1(password) 240 # if data.flags & NegotiateFlags.extended_session_security == 0: 241 # key_exchange_key = kxkey(data.flags, session_base_key, lmowf, lm_response_data, b"") 242 243 else: 244 nt_proof_str = nt_response_data[:16] 245 nt_response['ResponseType'] = 'NTLMv2' 246 nt_response['NTProofStr'] = base64.b16encode(nt_proof_str).decode() 247 248 challenge = NTClientChallengeV2.unpack(nt_response_data[16:]) 249 b_challenge = nt_response_data[16:] 250 251 nt_response['ClientChallenge'] = { 252 'RespType': challenge.resp_type, 253 'HiRespType': challenge.hi_resp_type, 254 'Reserved1': struct.unpack("<H", b_challenge[2:4])[0], 255 'Reserved2': struct.unpack("<I", b_challenge[4:8])[0], 256 'TimeStamp': str(challenge.time_stamp), 257 'ChallengeFromClient': base64.b16encode(challenge.challenge_from_client).decode(), 258 'Reserved3': struct.unpack("<I", b_challenge[24:28])[0], 259 'AvPairs': _parse_ntlm_target_info(challenge.av_pairs), 260 'Reserved4': struct.unpack("<I", b_challenge[-4:])[0], 261 } 262 263 if password: 264 response_key_nt = ntowfv2(msg['Payload']['UserName'], ntowfv1(password), msg['Payload']['DomainName']) 265 key_exchange_key = hmac_md5(response_key_nt, nt_proof_str) 266 267 msg['Payload']['NtChallengeResponse'] = nt_response 268 269 if data.encrypted_random_session_key: 270 msg['Payload']['EncryptedRandomSessionKey'] = base64.b16encode(data.encrypted_random_session_key).decode() 271 272 if data.flags & NegotiateFlags.key_exch and (data.flags & NegotiateFlags.sign or data.flags & NegotiateFlags.seal): 273 session_key = None 274 if key_exchange_key: 275 session_key = rc4k(key_exchange_key, typing.cast(bytes, data.encrypted_random_session_key)) 276 277 else: 278 session_key = key_exchange_key 279 280 msg['SessionKey'] = base64.b16encode(session_key).decode() if session_key else 'Failed to derive' 281 282 return msg 283 284 285def _parse_spnego_init( 286 data: NegTokenInit, 287 secret: typing.Optional[str] = None, 288 encoding: typing.Optional[str] = None, 289) -> typing.Dict[str, typing.Any]: 290 mech_types = [parse_enum(m, enum_type=GSSMech) for m in data.mech_types] \ 291 if data.mech_types else None 292 293 mech_token = None 294 if data.mech_token: 295 mech_token = parse_token(data.mech_token, secret=secret, encoding=encoding) 296 297 encoding = encoding or 'utf-8' 298 299 msg = { 300 'mechTypes': mech_types, 301 'reqFlags': parse_flags(data.req_flags) if data.req_flags is not None else None, 302 'mechToken': mech_token, 303 'mechListMIC': base64.b16encode(data.mech_list_mic).decode() if data.mech_list_mic is not None else None, 304 } 305 306 if data.hint_name or data.hint_address: 307 # This is a NegTokenInit2 structure. 308 msg['negHints'] = { 309 'hintName': data.hint_name.decode(encoding) if data.hint_name else None, 310 'hintAddress': data.hint_address.decode(encoding) if data.hint_address else None, 311 } 312 313 return msg 314 315 316def _parse_spnego_resp( 317 data: NegTokenResp, 318 secret: typing.Optional[str] = None, 319 encoding: typing.Optional[str] = None, 320) -> typing.Dict[str, typing.Any]: 321 supported_mech = parse_enum(data.supported_mech, enum_type=GSSMech) if data.supported_mech else None 322 323 response_token = None 324 if data.response_token: 325 response_token = parse_token(data.response_token, secret=secret, encoding=encoding) 326 327 msg = { 328 'negState': parse_enum(data.neg_state) if data.neg_state is not None else None, 329 'supportedMech': supported_mech, 330 'responseToken': response_token, 331 'mechListMIC': base64.b16encode(data.mech_list_mic).decode() if data.mech_list_mic is not None else None, 332 } 333 return msg 334 335 336def main(args: typing.List[str]) -> None: 337 """Main program entry point.""" 338 parsed_args = parse_args(args) 339 340 if parsed_args.token: 341 b_data = to_bytes(parsed_args.token) 342 else: 343 if parsed_args.file: 344 file_path = os.path.abspath(os.path.expanduser(os.path.expandvars(parsed_args.file))) 345 b_file_path = to_bytes(file_path) 346 if not os.path.exists(b_file_path): 347 raise ValueError("Cannot find file at path '%s'" % file_path) 348 349 with open(b_file_path, mode='rb') as fd: 350 b_data = fd.read() 351 else: 352 b_data = sys.stdin.buffer.read() 353 354 if re.match(b'^[a-fA-F0-9\\s]+$', b_data): 355 # Input data was a hex string. 356 b_data = base64.b16decode(re.sub(b'[\\s]', b'', b_data.strip().upper())) 357 if re.match(b'^([A-Za-z0-9+/]{4})*([A-Za-z0-9+/]{3}=|[A-Za-z0-9+/]{2}==)?$', b_data): 358 # Input data was a base64 string. 359 b_data = base64.b64decode(b_data.strip()) 360 361 token_info = parse_token(b_data, secret=parsed_args.secret, encoding=parsed_args.encoding) 362 363 if parsed_args.output_format == 'yaml': 364 y = yaml.YAML() # type: ignore 365 y.default_flow_style = False 366 y.dump(token_info, sys.stdout) 367 else: 368 print(json.dumps(token_info, indent=4)) 369 370 371def parse_args(args: typing.List[str]) -> argparse.Namespace: 372 """Parse and return args.""" 373 parser = argparse.ArgumentParser(description='Parse Microsoft authentication tokens into a human readable format.') 374 375 data = parser.add_mutually_exclusive_group() 376 377 data.add_argument('-t', '--token', 378 dest='token', 379 help='Raw base64 encoded or hex string token as a command line argument.') 380 381 data.add_argument('-f', '--file', 382 default='', 383 dest='file', 384 help='Path to file that contains raw bytes, base64, or hex string of token to parse, Defaults ' 385 'to reading from stdin if neither -t or -f is specified.') 386 387 parser.add_argument('--encoding', 388 dest='encoding', 389 help="The encoding to use when trying to decode text fields from bytes in tokens that don't " 390 "have a negotiated encoding. This defaults to 'windows-1252' for NTLM tokens and 'utf-8' " 391 "for Kerberos/SPNEGO tokens.") 392 393 parser.add_argument('--format', '--output-format', 394 choices=['json', 'yaml'], 395 default='json', 396 dest='output_format', 397 type=lambda s: s.lower(), 398 help='Set the output format of the token, default is (json). Using yaml requires the ' 399 'ruamel.yaml Python library to be installed ''pip install pyspnego[yaml]''.') 400 401 parser.add_argument('--secret', '--password', 402 dest='secret', 403 default=None, 404 help='Optional info that is the secret information for a protocol that can be used to decrypt ' 405 'encrypted fields and/or derive the unique session key in the exchange. This is ' 406 'currently only supported by NTLM tokens to generate the session key.') 407 408 if argcomplete: 409 argcomplete.autocomplete(parser) 410 411 parsed_args = parser.parse_args(args) 412 413 if parsed_args.output_format == 'yaml' and not yaml: 414 raise ValueError('Cannot output as yaml as ruamel.yaml is not installed.') 415 416 return parsed_args 417 418 419def parse_token( 420 b_data: bytes, 421 secret: typing.Optional[str] = None, 422 encoding: typing.Optional[str] = None, 423 mech: typing.Optional[typing.Union[str, GSSMech]] = None 424) -> typing.Dict[str, typing.Any]: 425 """ 426 :param b_data: A byte string of the token to parse. This can be a NTLM or GSSAPI (SPNEGO/Kerberos) token. 427 :param secret: The secret data used to decrypt fields and/or derive session keys. 428 :param encoding: The encoding to use for token fields that represent text. This is only used for fields where there 429 is no negotiation for the encoding of that particular field. Defaults to 'windows-1252' for NTLM and 'utf-8' 430 for Kerberos. 431 :return: A dict containing the parsed token data. 432 """ 433 gss_mech: typing.Optional[GSSMech] = None 434 if mech and not isinstance(mech, GSSMech): 435 gss_mech = GSSMech.from_oid(mech) 436 437 try: 438 token = unpack_token(b_data, mech=gss_mech, unwrap=True, encoding=encoding) 439 except Exception as e: 440 return { 441 'MessageType': 'Unknown - Failed to parse see Data for more details.', 442 'Data': 'Failed to parse token: %s' % str(e), 443 'RawData': base64.b16encode(b_data).decode(), 444 } 445 446 msg_type = 'Unknown' 447 data: typing.Union[str, typing.Dict[str, typing.Any]] = 'Failed to parse SPNEGO token due to unknown mech type' 448 449 # SPNEGO messages. 450 if isinstance(token, InitialContextToken): 451 msg_type = 'SPNEGO InitialContextToken' 452 data = { 453 'thisMech': parse_enum(token.this_mech, enum_type=GSSMech), 454 'innerContextToken': parse_token(token.inner_context_token, mech=token.this_mech, secret=secret, 455 encoding=encoding), 456 } 457 458 elif isinstance(token, NegTokenInit): 459 data = _parse_spnego_init(token, secret, encoding) 460 if 'negHints' in data: 461 msg_type = 'SPNEGO NegTokenInit2' 462 463 else: 464 msg_type = 'SPNEGO NegTokenInit' 465 466 elif isinstance(token, NegTokenResp): 467 msg_type = 'SPNEGO NegTokenResp' 468 data = _parse_spnego_resp(token, secret, encoding) 469 470 # NTLM messages. 471 elif isinstance(token, (Negotiate, Challenge, Authenticate)): 472 msg_type = parse_enum(token.MESSAGE_TYPE) 473 474 if isinstance(token, Negotiate): 475 data = _parse_ntlm_negotiate(token) 476 477 elif isinstance(token, Challenge): 478 data = _parse_ntlm_challenge(token) 479 480 else: 481 data = _parse_ntlm_authenticate(token, secret) 482 483 # Kerberos messages. 484 elif isinstance(token, KerberosV5Msg): 485 msg_type = parse_enum(token.MESSAGE_TYPE) 486 data = parse_kerberos_token(token, secret, encoding) 487 488 return { 489 'MessageType': msg_type, 490 'Data': data, 491 'RawData': base64.b16encode(b_data).decode(), 492 } 493 494 495if __name__ == '__main__': # pragma: nocover 496 main(sys.argv[1:]) 497