1# Copyright: (c) 2020, Jordan Borean (@jborean93) <jborean93@gmail.com> 2# MIT License (see LICENSE or https://opensource.org/licenses/MIT) 3 4import collections 5import datetime 6import enum 7import io 8import re 9import struct 10import typing 11 12from spnego._text import to_text 13from spnego._version import __version__ as pyspnego_version 14 15 16class NegotiateFlags(enum.IntFlag): 17 """NTLM Negotiation flags. 18 19 Used during NTLM negotiation to negotiate the capabilities between the client and server. 20 21 .. _NEGOTIATE: 22 https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-nlmp/99d90ff4-957f-4c8a-80e4-5bfe5a9a9832 23 """ 24 key_56 = 0x80000000 25 key_exch = 0x40000000 26 key_128 = 0x20000000 27 r1 = 0x10000000 28 r2 = 0x08000000 29 r3 = 0x04000000 30 version = 0x02000000 31 r4 = 0x01000000 32 target_info = 0x00800000 33 non_nt_session_key = 0x00400000 34 r5 = 0x00200000 35 identity = 0x00100000 36 extended_session_security = 0x00080000 37 target_type_share = 0x00040000 # Not documented in MS-NLMP 38 target_type_server = 0x00020000 39 target_type_domain = 0x00010000 40 always_sign = 0x00008000 41 local_call = 0x00004000 # Not documented in MS-NLMP 42 oem_workstation_supplied = 0x00002000 43 oem_domain_name_supplied = 0x00001000 44 anonymous = 0x00000800 45 r8 = 0x00000400 46 ntlm = 0x00000200 47 r9 = 0x00000100 48 lm_key = 0x00000080 49 datagram = 0x00000040 50 seal = 0x00000020 51 sign = 0x00000010 52 netware = 0x00000008 # Not documented in MS-NLMP 53 request_target = 0x00000004 54 oem = 0x00000002 55 unicode = 0x00000001 56 57 @classmethod 58 def native_labels(cls) -> typing.Dict["NegotiateFlags", str]: 59 return { 60 NegotiateFlags.key_56: 'NTLMSSP_NEGOTIATE_56', 61 NegotiateFlags.key_exch: 'NTLMSSP_NEGOTIATE_KEY_EXCH', 62 NegotiateFlags.key_128: 'NTLMSSP_NEGOTIATE_128', 63 NegotiateFlags.r1: 'NTLMSSP_RESERVED_R1', 64 NegotiateFlags.r2: 'NTLMSSP_RESERVED_R2', 65 NegotiateFlags.r3: 'NTLMSSP_RESERVED_R3', 66 NegotiateFlags.version: 'NTLMSSP_NEGOTIATE_VERSION', 67 NegotiateFlags.r4: 'NTLMSSP_RESERVED_R4', 68 NegotiateFlags.target_info: 'NTLMSSP_NEGOTIATE_TARGET_INFO', 69 NegotiateFlags.non_nt_session_key: 'NTLMSSP_REQUEST_NON_NT_SESSION_KEY', 70 NegotiateFlags.r5: 'NTLMSSP_RESERVED_R5', 71 NegotiateFlags.identity: 'NTLMSSP_NEGOTIATE_IDENTITY', 72 NegotiateFlags.extended_session_security: 'NTLMSSP_NEGOTIATE_EXTENDED_SESSIONSECURITY', 73 NegotiateFlags.target_type_share: 'NTLMSSP_TARGET_TYPE_SHARE - R6', 74 NegotiateFlags.target_type_server: 'NTLMSSP_TARGET_TYPE_SERVER', 75 NegotiateFlags.target_type_domain: 'NTLMSSP_TARGET_TYPE_DOMAIN', 76 NegotiateFlags.always_sign: 'NTLMSSP_NEGOTIATE_ALWAYS_SIGN', 77 NegotiateFlags.local_call: 'NTLMSSP_NEGOTIATE_LOCAL_CALL - R7', 78 NegotiateFlags.oem_workstation_supplied: 'NTLMSSP_NEGOTIATE_OEM_WORKSTATION_SUPPLIED', 79 NegotiateFlags.oem_domain_name_supplied: 'NTLMSSP_NEGOTIATE_OEM_DOMAIN_SUPPLIED', 80 NegotiateFlags.anonymous: 'NTLMSSP_ANOYNMOUS', 81 NegotiateFlags.r8: 'NTLMSSP_RESERVED_R8', 82 NegotiateFlags.ntlm: 'NTLMSSP_NEGOTIATE_NTLM', 83 NegotiateFlags.r9: 'NTLMSSP_RESERVED_R9', 84 NegotiateFlags.lm_key: 'NTLMSSP_NEGOTIATE_LM_KEY', 85 NegotiateFlags.datagram: 'NTLMSSP_NEGOTIATE_DATAGRAM', 86 NegotiateFlags.seal: 'NTLMSSP_NEGOTIATE_SEAL', 87 NegotiateFlags.sign: 'NTLMSSP_NEGOTIATE_SIGN', 88 NegotiateFlags.netware: 'NTLMSSP_NEGOTIATE_NETWARE - R10', 89 NegotiateFlags.request_target: 'NTLMSSP_REQUEST_TARGET', 90 NegotiateFlags.oem: 'NTLMSSP_NEGOTIATE_OEM', 91 NegotiateFlags.unicode: 'NTLMSSP_NEGOTIATE_UNICODE', 92 } 93 94 95class AvId(enum.IntFlag): 96 """ID for an NTLM AV_PAIR. 97 98 These are the IDs that can be set as the `AvId` on an `AV_PAIR`_. 99 100 .. _AV_PAIR: 101 https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-nlmp/83f5e789-660d-4781-8491-5f8c6641f75e 102 """ 103 eol = 0x0000 104 nb_computer_name = 0x0001 105 nb_domain_name = 0x0002 106 dns_computer_name = 0x0003 107 dns_domain_name = 0x0004 108 dns_tree_name = 0x0005 109 flags = 0x0006 110 timestamp = 0x0007 111 single_host = 0x0008 112 target_name = 0x0009 113 channel_bindings = 0x000A 114 115 @classmethod 116 def native_labels(cls) -> typing.Dict["AvId", str]: 117 return { 118 AvId.eol: 'MSV_AV_EOL', 119 AvId.nb_computer_name: 'MSV_AV_NB_COMPUTER_NAME', 120 AvId.nb_domain_name: 'MSV_AV_NB_DOMAIN_NAME', 121 AvId.dns_computer_name: 'MSV_AV_DNS_COMPUTER_NAME', 122 AvId.dns_domain_name: 'MSV_AV_DNS_DOMAIN_NAME', 123 AvId.dns_tree_name: 'MSV_AV_DNS_TREE_NAME', 124 AvId.flags: 'MSV_AV_FLAGS', 125 AvId.timestamp: 'MSV_AV_TIMESTAMP', 126 AvId.single_host: 'MSV_AV_SINGLE_HOST', 127 AvId.target_name: 'MSV_AV_TARGET_NAME', 128 AvId.channel_bindings: 'MSV_AV_CHANNEL_BINDINGS', 129 } 130 131 132class AvFlags(enum.IntFlag): 133 """MsvAvFlags for an AV_PAIR. 134 135 These are the flags that can be set on the MsvAvFlags entry of an NTLM `AV_PAIR`_. 136 137 .. _AV_PAIR: 138 https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-nlmp/83f5e789-660d-4781-8491-5f8c6641f75e 139 """ 140 none = 0x00000000 141 constrained = 0x00000001 142 mic = 0x00000002 143 untrusted_spn = 0x00000004 144 145 @classmethod 146 def native_labels(cls) -> typing.Dict["AvFlags", str]: 147 return { 148 AvFlags.constrained: 'AUTHENTICATION_CONSTRAINED', 149 AvFlags.mic: 'MIC_PROVIDED', 150 AvFlags.untrusted_spn: 'UNTRUSTED_SPN_SOURCE', 151 } 152 153 154class MessageType(enum.IntEnum): 155 negotiate = 1 156 challenge = 2 157 authenticate = 3 158 159 @classmethod 160 def native_labels(cls) -> typing.Dict["MessageType", str]: 161 return { 162 MessageType.negotiate: 'NEGOTIATE_MESSAGE', 163 MessageType.challenge: 'CHALLENGE_MESSAGE', 164 MessageType.authenticate: 'AUTHENTICATE_MESSAGE', 165 } 166 167 168def _get_payload_offset(b_data: memoryview, field_offsets: typing.List[int]) -> int: 169 payload_offset = None 170 171 for field_offset in field_offsets: 172 offset = struct.unpack("<I", b_data[field_offset + 4:field_offset + 8].tobytes())[0] 173 if not payload_offset or (offset and offset < payload_offset): 174 payload_offset = offset 175 176 return payload_offset or len(b_data) 177 178 179def _pack_payload( 180 data: typing.Any, 181 b_payload: bytearray, 182 payload_offset: int, 183 pack_func: typing.Optional[typing.Callable[[typing.Any], bytes]] = None, 184) -> typing.Tuple[bytes, int]: 185 if data: 186 b_data = pack_func(data) if pack_func else data 187 else: 188 b_data = b"" 189 190 b_payload.extend(b_data) 191 length = len(b_data) 192 193 b_field = (struct.pack("<H", length) * 2) + struct.pack("<I", payload_offset) 194 payload_offset += length 195 196 return b_field, payload_offset 197 198 199def _unpack_payload( 200 b_data: memoryview, 201 field_offset: int, 202 unpack_func: typing.Optional[typing.Callable[[bytes], typing.Any]] = None, 203) -> typing.Any: 204 field_len = struct.unpack("<H", b_data[field_offset:field_offset + 2].tobytes())[0] 205 if field_len: 206 field_offset = struct.unpack("<I", b_data[field_offset + 4:field_offset + 8].tobytes())[0] 207 b_value = b_data[field_offset:field_offset + field_len].tobytes() 208 209 return unpack_func(b_value) if unpack_func else b_value 210 211 212class _NTLMMessageMeta(type): 213 __registry: typing.Dict[int, typing.Type] = {} 214 215 def __init__(cls, name, bases, attributes): 216 cls.__registry[cls.MESSAGE_TYPE] = cls 217 218 def __call__(cls, *args, **kwargs): 219 if '_b_data' in kwargs: 220 message_type = struct.unpack("<I", kwargs['_b_data'][8:12])[0] 221 new_cls = cls.__registry[message_type] 222 223 else: 224 new_cls = cls 225 226 return super(_NTLMMessageMeta, new_cls).__call__(*args, **kwargs) 227 228 229class NTLMMessage(metaclass=_NTLMMessageMeta): 230 """ Base NTLM message class that defines the pack and unpack functions. """ 231 232 MESSAGE_TYPE = 0 233 MINIMUM_LENGTH = 0 234 235 def __init__(self, encoding: typing.Optional[str] = None, _b_data: typing.Optional[bytes] = None) -> None: 236 self.signature = b"NTLMSSP\x00" + struct.pack("<I", self.MESSAGE_TYPE) 237 self._encoding = encoding or 'windows-1252' 238 239 if _b_data: 240 if len(_b_data) < self.MINIMUM_LENGTH: 241 raise ValueError("Invalid NTLM %s raw byte length" % self.__class__.__name__) 242 243 self._data = memoryview(bytearray(_b_data)) 244 245 else: 246 self._data = memoryview(b"") 247 248 def pack(self) -> bytes: 249 """ Packs the structure to bytes. """ 250 return self._data.tobytes() 251 252 @staticmethod 253 def unpack(b_data: bytes, encoding: typing.Optional[str] = None) -> "NTLMMessage": 254 """ Unpacks the structure from bytes. """ 255 return NTLMMessage(encoding=encoding, _b_data=b_data) 256 257 258class Negotiate(NTLMMessage): 259 """NTLM Negotiate Message 260 261 This structure represents an NTLM `NEGOTIATE_MESSAGE`_ that can be serialized and deserialized to and from 262 bytes. 263 264 Args: 265 flags: The `NegotiateFlags` that the client has negotiated. 266 domain_name: The `DomainName` of the client authentication domain. 267 workstation: The `Workstation` of the client. 268 version: The `Version` of the client. 269 encoding: The OEM encoding to use for text fields. 270 _b_data: The raw bytes of the message to unpack from. 271 272 .. _NEGOTIATE_MESSAGE: 273 https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-nlmp/b34032e5-3aae-4bc6-84c3-c6d80eadf7f2 274 """ 275 276 MESSAGE_TYPE = MessageType.negotiate 277 MINIMUM_LENGTH = 32 278 279 def __init__( 280 self, 281 flags: int = 0, 282 domain_name: typing.Optional[str] = None, 283 workstation: typing.Optional[str] = None, 284 version: typing.Optional["Version"] = None, 285 encoding: typing.Optional[str] = None, 286 _b_data: typing.Optional[bytes] = None, 287 ) -> None: 288 super(Negotiate, self).__init__(encoding=encoding, _b_data=_b_data) 289 290 if not _b_data: 291 b_payload = bytearray() 292 293 payload_offset = 32 294 295 b_version = b"" 296 if version: 297 flags |= NegotiateFlags.version 298 b_version = version.pack() 299 payload_offset = _pack_payload(b_version, b_payload, payload_offset)[1] 300 301 b_domain_name = b"" 302 if domain_name: 303 flags |= NegotiateFlags.oem_domain_name_supplied 304 b_domain_name = domain_name.encode(self._encoding) 305 b_domain_name_fields, payload_offset = _pack_payload(b_domain_name, b_payload, payload_offset) 306 307 b_workstation = b"" 308 if workstation: 309 flags |= NegotiateFlags.oem_workstation_supplied 310 b_workstation = workstation.encode(self._encoding) 311 b_workstation_fields = _pack_payload(b_workstation, b_payload, payload_offset)[0] 312 313 b_data = bytearray(self.signature) 314 b_data.extend(struct.pack("<I", flags)) 315 b_data.extend(b_domain_name_fields) 316 b_data.extend(b_workstation_fields) 317 b_data.extend(b_payload) 318 319 self._data = memoryview(b_data) 320 321 @property 322 def flags(self) -> int: 323 """ The negotiate flags for the Negotiate message. """ 324 return struct.unpack("<I", self._data[12:16].tobytes())[0] 325 326 @flags.setter 327 def flags(self, value: int) -> None: 328 self._data[12:16] = struct.pack("<I", value) 329 330 @property 331 def domain_name(self) -> typing.Optional[str]: 332 """ The name of the client authentication domain. """ 333 return to_text(_unpack_payload(self._data, 16), encoding=self._encoding, errors='replace', 334 nonstring='passthru') 335 336 @property 337 def workstation(self) -> typing.Optional[str]: 338 """ The name of the client machine. """ 339 return to_text(_unpack_payload(self._data, 24), encoding=self._encoding, errors='replace', 340 nonstring='passthru') 341 342 @property 343 def version(self) -> typing.Optional["Version"]: 344 """ The client NTLM version. """ 345 payload_offset = self._payload_offset 346 347 # If the payload offset is at 40 or more then the Version, or at least empty bytes, is in the payload. 348 if payload_offset >= 40: 349 return Version.unpack(self._data[32:40].tobytes()) 350 351 else: 352 return None 353 354 @property 355 def _payload_offset(self) -> int: 356 """ Gets the offset of the first payload value. """ 357 return _get_payload_offset(self._data, [16, 24]) 358 359 360class Challenge(NTLMMessage): 361 """NTLM Challenge Message 362 363 This structure represents an NTLM `CHALLENGE_MESSAGE`_ that can be serialized and deserialized to and from 364 bytes. 365 366 Args: 367 flags: The `NegotiateFlags` that the client has negotiated. 368 server_challenge: The random 64-bit `ServerChallenge` nonce. 369 target_name: The name of the acceptor server. 370 target_info: The variable length `TargetInfo` information. 371 version: The `Version` of the server. 372 encoding: The OEM encoding to use for text fields if `NTLMSSP_NEGOTIATE_UNICODE` was not supported. 373 _b_data: The raw NTLM Challenge bytes to unpack from. 374 375 .. _CHALLENGE_MESSAGE: 376 https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-nlmp/801a4681-8809-4be9-ab0d-61dcfe762786 377 """ 378 379 MESSAGE_TYPE = MessageType.challenge 380 MINIMUM_LENGTH = 48 381 382 def __init__( 383 self, 384 flags: int = 0, 385 server_challenge: typing.Optional[bytes] = None, 386 target_name: typing.Optional[str] = None, 387 target_info: typing.Optional["TargetInfo"] = None, 388 version: typing.Optional["Version"] = None, 389 encoding: typing.Optional[str] = None, 390 _b_data: typing.Optional[bytes] = None, 391 ): 392 super(Challenge, self).__init__(encoding=encoding, _b_data=_b_data) 393 394 if _b_data: 395 self._encoding = 'utf-16-le' if self.flags & NegotiateFlags.unicode else self._encoding 396 397 else: 398 self._encoding = 'utf-16-le' if flags & NegotiateFlags.unicode else self._encoding 399 400 b_payload = bytearray() 401 payload_offset = 48 402 403 b_version = b"" 404 if version: 405 flags |= NegotiateFlags.version 406 b_version = version.pack() 407 payload_offset = _pack_payload(b_version, b_payload, payload_offset)[1] 408 409 b_target_name = b"" 410 if target_name: 411 flags |= NegotiateFlags.request_target 412 b_target_name = target_name.encode(self._encoding) 413 b_target_name_fields, payload_offset = _pack_payload(b_target_name, b_payload, payload_offset) 414 415 b_target_info = b"" 416 if target_info: 417 flags |= NegotiateFlags.target_info 418 b_target_info = target_info.pack() 419 b_target_info_fields = _pack_payload(b_target_info, b_payload, payload_offset)[0] 420 421 b_data = bytearray(self.signature) 422 b_data.extend(b_target_name_fields) 423 b_data.extend(struct.pack("<I", flags)) 424 b_data.extend(b"\x00" * 8) # ServerChallenge, set after self._data is initialised. 425 b_data.extend(b"\x00" * 8) # Reserved 426 b_data.extend(b_target_info_fields) 427 b_data.extend(b_payload) 428 429 self._data = memoryview(b_data) 430 431 if server_challenge: 432 self.server_challenge = server_challenge 433 434 @property 435 def target_name(self) -> typing.Optional[str]: 436 """ The name of the server authentication realm. """ 437 return to_text(_unpack_payload(self._data, 12), encoding=self._encoding, nonstring='passthru') 438 439 @property 440 def flags(self) -> int: 441 """ The negotiate flags supported by the server. """ 442 return struct.unpack("<I", self._data[20:24].tobytes())[0] 443 444 @flags.setter 445 def flags(self, value: int) -> None: 446 self._data[20:24] = struct.pack("<I", value) 447 448 @property 449 def server_challenge(self) -> bytes: 450 """ The server's 8 byte nonce challenge. """ 451 return self._data[24:32].tobytes() 452 453 @server_challenge.setter 454 def server_challenge(self, value: bytes) -> None: 455 if not value or len(value) != 8: 456 raise ValueError("NTLM Challenge ServerChallenge must be 8 bytes long") 457 458 self._data[24:32] = value 459 460 @property 461 def target_info(self) -> typing.Optional["TargetInfo"]: 462 """ The AV_PAIR structures generated by the server. """ 463 return _unpack_payload(self._data, 40, lambda d: TargetInfo.unpack(d)) 464 465 @property 466 def version(self) -> typing.Optional["Version"]: 467 """ The server NTLM version. """ 468 payload_offset = self._payload_offset 469 470 # If the payload offset is at 56 or more then the Version, or at least empty bytes, is in the payload. 471 if payload_offset >= 56: 472 return Version.unpack(self._data[48:56].tobytes()) 473 474 else: 475 return None 476 477 @property 478 def _payload_offset(self) -> int: 479 """ Gets the offset of the first payload value. """ 480 return _get_payload_offset(self._data, [12, 40]) 481 482 483class Authenticate(NTLMMessage): 484 """NTLM Authentication Message 485 486 This structure represents an NTLM `AUTHENTICATION_MESSAGE`_ that can be serialized and deserialized to and from 487 bytes. 488 489 Args: 490 flags: The `NegotiateFlags` that the client has negotiated. 491 lm_challenge_response: The `LmChallengeResponse` for the client's secret. 492 nt_challenge_response: The `NtChallengeResponse` for the client's secret. 493 domain_name: The `DomainName` for the client. 494 username: The `UserName` for the cleint. 495 workstation: The `Workstation` for the client. 496 encrypted_session_key: The `EncryptedRandomSessionKey` for the set up context. 497 version: The `Version` of the client. 498 mic: The `MIC` for the authentication exchange. 499 encoding: The OEM encoding to use for text fields if `NTLMSSP_NEGOTIATE_UNICODE` was not supported. 500 _b_data: The raw NTLM Authenticate bytes to unpack from. 501 502 .. _AUTHENTICATION_MESSAGE: 503 https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-nlmp/033d32cc-88f9-4483-9bf2-b273055038ce 504 """ 505 506 MESSAGE_TYPE = MessageType.authenticate 507 MINIMUM_LENGTH = 64 508 509 def __init__( 510 self, 511 flags: int = 0, 512 lm_challenge_response: typing.Optional[bytes] = None, 513 nt_challenge_response: typing.Optional[bytes] = None, 514 domain_name: typing.Optional[str] = None, 515 username: typing.Optional[str] = None, 516 workstation: typing.Optional[str] = None, 517 encrypted_session_key: typing.Optional[bytes] = None, 518 version: typing.Optional["Version"] = None, 519 mic: typing.Optional[bytes] = None, 520 encoding: typing.Optional[str] = None, 521 _b_data: typing.Optional[bytes] = None, 522 ) -> None: 523 super(Authenticate, self).__init__(encoding=encoding, _b_data=_b_data) 524 525 if _b_data: 526 self._encoding = 'utf-16-le' if self.flags & NegotiateFlags.unicode else self._encoding 527 528 else: 529 self._encoding = 'utf-16-le' if flags & NegotiateFlags.unicode else self._encoding 530 531 b_payload = bytearray() 532 533 payload_offset = 64 534 535 # While MS server accept a blank version field, other implementations aren't so kind. No need to be strict 536 # about it and only add the version bytes if it's present. 537 b_version = b"" 538 if version: 539 flags |= NegotiateFlags.version 540 b_version = version.pack() 541 payload_offset = _pack_payload(b_version, b_payload, payload_offset)[1] 542 543 # MIC 544 payload_offset = _pack_payload(b"\x00" * 16, b_payload, payload_offset)[1] 545 546 b_lm_response_fields, payload_offset = _pack_payload(lm_challenge_response, b_payload, payload_offset) 547 b_nt_response_fields, payload_offset = _pack_payload(nt_challenge_response, b_payload, payload_offset) 548 b_domain_fields, payload_offset = _pack_payload(domain_name, b_payload, payload_offset, 549 lambda d: d.encode(self._encoding)) 550 b_username_fields, payload_offset = _pack_payload(username, b_payload, payload_offset, 551 lambda d: d.encode(self._encoding)) 552 b_workstation_fields, payload_offset = _pack_payload(workstation, b_payload, payload_offset, 553 lambda d: d.encode(self._encoding)) 554 if encrypted_session_key: 555 flags |= NegotiateFlags.key_exch 556 b_session_key_fields = _pack_payload(encrypted_session_key, b_payload, payload_offset)[0] 557 558 b_data = bytearray(self.signature) 559 b_data.extend(b_lm_response_fields) 560 b_data.extend(b_nt_response_fields) 561 b_data.extend(b_domain_fields) 562 b_data.extend(b_username_fields) 563 b_data.extend(b_workstation_fields) 564 b_data.extend(b_session_key_fields) 565 b_data.extend(struct.pack("<I", flags)) 566 b_data.extend(b_payload) 567 568 self._data = memoryview(b_data) 569 570 if mic: 571 self.mic = mic 572 573 @property 574 def lm_challenge_response(self) -> typing.Optional[bytes]: 575 """ The LmChallengeResponse or None if not set. """ 576 return _unpack_payload(self._data, 12) 577 578 @property 579 def nt_challenge_response(self) -> typing.Optional[bytes]: 580 """ The NtChallengeResponse or None if not set. """ 581 return _unpack_payload(self._data, 20) 582 583 @property 584 def domain_name(self) -> typing.Optional[str]: 585 """ The domain or computer name hosting the user account. """ 586 return to_text(_unpack_payload(self._data, 28), encoding=self._encoding, nonstring='passthru') 587 588 @property 589 def user_name(self) -> typing.Optional[str]: 590 """ The name of the user to be authenticated. """ 591 return to_text(_unpack_payload(self._data, 36), encoding=self._encoding, nonstring='passthru') 592 593 @property 594 def workstation(self) -> typing.Optional[str]: 595 """ The name of the computer to which the user is logged on. """ 596 return to_text(_unpack_payload(self._data, 44), encoding=self._encoding, nonstring='passthru') 597 598 @property 599 def encrypted_random_session_key(self) -> typing.Optional[bytes]: 600 """ The client's encrypted random session key. """ 601 return _unpack_payload(self._data, 52) 602 603 @property 604 def flags(self) -> int: 605 """ The negotiate flags supported by the client and server. """ 606 return struct.unpack("<I", self._data[60:64].tobytes())[0] 607 608 @flags.setter 609 def flags(self, value: int) -> None: 610 self._data[60:64] = struct.pack("<I", value) 611 612 @property 613 def version(self) -> typing.Optional["Version"]: 614 """ The client NTLM version. """ 615 payload_offset = self._payload_offset 616 617 # If the payload offset is at 64 (no MIC or Version) or 80 (only MIC) then no version is present. 618 if payload_offset not in [64, 80] and payload_offset >= 72: 619 return Version.unpack(self._data[64:72].tobytes()) 620 621 else: 622 return None 623 624 @property 625 def mic(self) -> typing.Optional[bytes]: 626 """ The MIC for the Authenticate message. """ 627 mic_offset = self._get_mic_offset() 628 if mic_offset: 629 return self._data.tobytes()[mic_offset:mic_offset + 16] 630 631 else: 632 return None 633 634 @mic.setter 635 def mic(self, value: bytes) -> None: 636 if len(value) != 16: 637 raise ValueError("NTLM Authenticate MIC must be 16 bytes long") 638 639 mic_offset = self._get_mic_offset() 640 if mic_offset: 641 self._data[mic_offset:mic_offset + 16] = value 642 else: 643 raise ValueError("Cannot set MIC on an Authenticate message with no MIC present") 644 645 @property 646 def _payload_offset(self) -> int: 647 """ Gets the offset of the first payload value. """ 648 return _get_payload_offset(self._data, [12, 20, 28, 36, 44, 52]) 649 650 def _get_mic_offset(self) -> int: 651 """ Gets the offset of the MIC structure if present. """ 652 payload_offset = self._payload_offset 653 654 # If the payload offset is 88 or more then we must have the Version (8 bytes) and the MIC (16 bytes) plus 655 # any random data after that. 656 if payload_offset >= 88: 657 return 72 658 659 # If the payload offset is between 80 and 88, then we should have just the MIC and no Version. 660 elif payload_offset >= 80: 661 return 64 662 663 # Not enough room for a MIC between the minimum size and the payload offset. 664 else: 665 return 0 666 667 668class FileTime(datetime.datetime): 669 """Windows FILETIME structure. 670 671 FILETIME structure representing number of 100-nanosecond intervals that have elapsed since January 1, 1601 UTC. 672 This subclasses the datetime object to provide a similar interface but with the `nanosecond` attribute. 673 674 Attrs: 675 nanosecond (int): The number of nanoseconds (< 1000) in the FileTime. Note this only has a precision of up to 676 100 nanoseconds. 677 678 .. _FILETIME: 679 https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-dtyp/2c57429b-fdd4-488f-b5fc-9e4cf020fcdf 680 """ 681 682 _EPOCH_FILETIME = 116444736000000000 # 1970-01-01 as FILETIME. 683 684 def __new__(cls, *args: typing.Any, **kwargs: typing.Any) -> "FileTime": 685 ns = 0 686 if 'nanosecond' in kwargs: 687 ns = kwargs.pop('nanosecond') 688 689 dt = super(FileTime, cls).__new__(cls, *args, **kwargs) 690 dt.nanosecond = ns 691 692 return dt 693 694 def __init__(self, *args: typing.Any, **kwargs: typing.Any) -> None: 695 super().__init__() 696 self.nanosecond = getattr(self, "nanosecond", None) or 0 697 698 @classmethod 699 def now(cls, tz: typing.Optional[datetime.tzinfo] = None) -> "FileTime": 700 """ Construct a FileTime from the current time and optional time zone info. """ 701 return FileTime.from_datetime(datetime.datetime.now(tz=tz)) 702 703 @classmethod 704 def from_datetime(cls, dt: datetime.datetime, ns: int = 0) -> "FileTime": 705 """ Creates a FileTime object from a datetime object. """ 706 return FileTime(year=dt.year, month=dt.month, day=dt.day, hour=dt.hour, minute=dt.minute, second=dt.second, 707 microsecond=dt.microsecond, tzinfo=dt.tzinfo, nanosecond=ns) 708 709 def __str__(self) -> str: 710 """ Displays the datetime in ISO 8601 including the 100th nanosecond internal like .NET does. """ 711 fraction_seconds = "" 712 713 if self.microsecond or self.nanosecond: 714 fraction_seconds = self.strftime('.%f') 715 716 if self.nanosecond: 717 fraction_seconds += str(self.nanosecond // 100) 718 719 timezone = 'Z' 720 if self.tzinfo: 721 utc_offset = self.strftime('%z') 722 timezone = "%s:%s" % (utc_offset[:3], utc_offset[3:]) 723 724 # strftime doesn't support dates < 1900 on Python 2.7 725 return '{0}-{1:02d}-{2:02d}T{3:02d}:{4:02d}:{5:02d}{6}{7}'.format( 726 self.year, self.month, self.day, self.hour, self.minute, self.second, fraction_seconds, timezone) 727 728 def pack(self) -> bytes: 729 """ Packs the structure to bytes. """ 730 # Make sure we are dealing with a timezone aware datetime 731 utc_tz = datetime.timezone.utc 732 utc_dt = self.replace(tzinfo=self.tzinfo if self.tzinfo else utc_tz) 733 734 # Get the time since UTC EPOCH in microseconds 735 td = utc_dt.astimezone(utc_tz) - datetime.datetime(1970, 1, 1, 0, 0, 0, tzinfo=utc_tz) 736 epoch_time_ms = (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10 ** 6) 737 738 # Add the EPOCH_FILETIME to the microseconds since EPOCH and finally the nanoseconds part. 739 ns100 = FileTime._EPOCH_FILETIME + (epoch_time_ms * 10) + (self.nanosecond // 100) 740 741 return struct.pack("<Q", ns100) 742 743 @staticmethod 744 def unpack(b_data: bytes) -> "FileTime": 745 """ Unpacks the structure from bytes. """ 746 filetime = struct.unpack("<Q", b_data)[0] # 100 nanosecond intervals since 1601-01-01. 747 748 # Create a datetime object based on the filetime microseconds 749 epoch_time_ms = (filetime - FileTime._EPOCH_FILETIME) // 10 750 dt = datetime.datetime(1970, 1, 1) + datetime.timedelta(microseconds=epoch_time_ms) 751 752 # Create the FileTime object from the datetime object and add the nanoseconds. 753 ns = int(filetime % 10) * 100 754 755 return FileTime.from_datetime(dt, ns=ns) 756 757 758class NTClientChallengeV2: 759 """NTLMv2 Client Challenge 760 761 The `NTLMv2_CLIENT_CHALLENGE`_ structure defines the client challenge in the AUTHENTICATE_MESSAGE. This structure 762 is only used when NTLMv2 authentication is configured and is transported in the NT Challenge Response. 763 764 Args: 765 time_stamp: The timestamp, defaults to the current time. 766 client_challenge: The 8 byte nonce generated by the client. 767 av_pairs: The TargetInfo AV_PAIRS for the client challenge. 768 _b_data: Raw byte string for the NTClientChallengeV2, when set the other args are ignored. 769 770 .. _NTLMv2_CLIENT_CHALLENGE: 771 https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-nlmp/aee311d6-21a7-4470-92a5-c4ecb022a87b 772 """ 773 774 def __init__( 775 self, 776 time_stamp: typing.Optional["FileTime"] = None, 777 client_challenge: typing.Optional[bytes] = None, 778 av_pairs: typing.Optional["TargetInfo"] = None, 779 _b_data: typing.Optional[bytes] = None, 780 ) -> None: 781 if _b_data: 782 if len(_b_data) < 32: 783 raise ValueError("Invalid NTClientChallengeV2 raw byte length") 784 785 self._data = memoryview(bytearray(_b_data)) 786 787 else: 788 time_stamp = time_stamp or FileTime.now() 789 av_pairs = av_pairs or TargetInfo() 790 791 b_data = bytearray(b"\x01\x01\x00\x00\x00\x00\x00\x00") 792 b_data.extend(time_stamp.pack()) 793 b_data.extend(b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00") 794 b_data.extend(av_pairs.pack()) 795 796 self._data = memoryview(b_data) 797 798 client_challenge = client_challenge or b"\x00" * 8 799 self.challenge_from_client = client_challenge 800 801 @property 802 def resp_type(self) -> int: 803 """ The current response type version, must be set to 1. """ 804 return struct.unpack("B", self._data[:1].tobytes())[0] 805 806 @resp_type.setter 807 def resp_type(self, value: int) -> None: 808 self._data[:1] = struct.pack("B", value) 809 810 @property 811 def hi_resp_type(self) -> int: 812 """ The maximum response type supported, must be set to 1. """ 813 return struct.unpack("B", self._data[1:2].tobytes())[0] 814 815 @hi_resp_type.setter 816 def hi_resp_type(self, value: int) -> None: 817 self._data[1:2] = struct.pack("B", value) 818 819 @property 820 def time_stamp(self) -> "FileTime": 821 """ The current system time. """ 822 return FileTime.unpack(self._data[8:16].tobytes()) 823 824 @time_stamp.setter 825 def time_stamp(self, value: "FileTime") -> None: 826 self._data[8:16] = value.pack() 827 828 @property 829 def challenge_from_client(self) -> bytes: 830 """ 8 byte client challenge. """ 831 return self._data[16:24].tobytes() 832 833 @challenge_from_client.setter 834 def challenge_from_client(self, value: bytes) -> None: 835 if len(value) != 8: 836 raise ValueError("NTClientChallengeV2 ChallengeFromClient must be 8 bytes long") 837 self._data[16:24] = value 838 839 @property 840 def av_pairs(self) -> "TargetInfo": 841 """ The target info AV_PAIR structures. """ 842 return TargetInfo.unpack(self._data[28:].tobytes()) 843 844 def pack(self) -> bytes: 845 """ Packs the NTClientChallengeV2 to bytes. """ 846 return self._data.tobytes() 847 848 @staticmethod 849 def unpack(b_data: bytes) -> "NTClientChallengeV2": 850 """ Unpacks the raw bytes to the NTClientChallengeV2 structure. """ 851 return NTClientChallengeV2(_b_data=b_data) 852 853 854class TargetInfo(collections.OrderedDict): 855 """A collection of AV_PAIR structures for the TargetInfo field. 856 857 The `AV_PAIR`_ structure defines an attribute/value pair and sequences of these pairs are using in the 858 :class:`Challenge` and :class:`Authenticate` messages. The value for each pair depends on the AvId specified. 859 Each value can be get/set/del like a normal dictionary where the key is the AvId of the value. 860 861 .. _AV_PAIR: 862 https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-nlmp/83f5e789-660d-4781-8491-5f8c6641f75e 863 """ 864 865 _FIELD_TYPES = { 866 'text': (AvId.nb_computer_name, AvId.nb_domain_name, AvId.dns_computer_name, AvId.dns_domain_name, 867 AvId.dns_tree_name, AvId.target_name), 868 'int32': (AvId.flags,), 869 'struct': (AvId.timestamp, AvId.single_host), 870 } 871 872 def __setitem__(self, key: AvId, value: typing.Any) -> None: 873 if isinstance(value, bytes): 874 if key == AvId.timestamp: 875 value = FileTime.unpack(value) 876 elif key == AvId.single_host: 877 value = SingleHost.unpack(value) 878 879 super(TargetInfo, self).__setitem__(key, value) 880 881 def pack(self) -> bytes: 882 """ Packs the structure to bytes. """ 883 b_data = io.BytesIO() 884 885 for av_id, value in self.items(): 886 # MsvAvEOL should only be set at the end, will just ignore these entries. 887 if av_id == AvId.eol: 888 continue 889 890 if av_id in self._FIELD_TYPES['text']: 891 b_value = value.encode('utf-16-le') 892 elif av_id in self._FIELD_TYPES['int32']: 893 b_value = struct.pack("<I", value) 894 elif av_id in self._FIELD_TYPES['struct']: 895 b_value = value.pack() 896 else: 897 b_value = value 898 899 b_data.write(struct.pack("<HH", av_id, len(b_value)) + b_value) 900 901 b_data.write(b"\x00\x00\x00\x00") # MsvAvEOL 902 return b_data.getvalue() 903 904 @staticmethod 905 def unpack(b_data: bytes) -> "TargetInfo": 906 """ Unpacks the structure from bytes. """ 907 target_info = TargetInfo() 908 b_io = io.BytesIO(b_data) 909 910 b_av_id = b_io.read(2) 911 912 while b_av_id: 913 av_id = struct.unpack("<H", b_av_id)[0] 914 length = struct.unpack("<H", b_io.read(2))[0] 915 b_value = b_io.read(length) 916 917 value: typing.Any 918 if av_id in TargetInfo._FIELD_TYPES['text']: 919 # All AV_PAIRS are UNICODE encoded. 920 value = b_value.decode('utf-16-le') 921 922 elif av_id in TargetInfo._FIELD_TYPES['int32']: 923 value = AvFlags(struct.unpack("<I", b_value)[0]) 924 925 elif av_id == AvId.timestamp: 926 value = FileTime.unpack(b_value) 927 928 elif av_id == AvId.single_host: 929 value = SingleHost.unpack(b_value) 930 931 else: 932 value = b_value 933 934 target_info[AvId(av_id)] = value 935 b_av_id = b_io.read(2) 936 937 return target_info 938 939 940class SingleHost: 941 """Single_Host_Data structure for NTLM TargetInfo entry. 942 943 `Single_Host_Data`_ structure allows a client to send machine-specific information within an authentication 944 exchange to services on the same machine. If the server and client platforms are different or if they are on 945 different hosts, then the information MUST be ignores. 946 947 Args: 948 size: A 32-bit unsigned int that defines size of the structure. 949 z4: A 32-bit integer value, currently set to 0. 950 custom_data: An 8-byte platform-specific blob containing info only relevant when the client and server are on 951 the same host. 952 machine_id: A 32-byte random number created at computer startup to identify the calling machine. 953 _b_data: Create a SingleHost object from the raw data byte string. 954 955 Attributes: 956 size: See args. 957 z4: See args. 958 custom_data: See args. 959 machine_id: See args. 960 961 .. _Single_Host_Data: 962 https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-nlmp/f221c061-cc40-4471-95da-d2ff71c85c5b 963 """ 964 965 def __init__( 966 self, 967 size: int = 0, 968 z4: int = 0, 969 custom_data: typing.Optional[bytes] = None, 970 machine_id: typing.Optional[bytes] = None, 971 _b_data: typing.Optional[bytes] = None, 972 ) -> None: 973 if _b_data: 974 if len(_b_data) != 48: 975 raise ValueError("SingleHost bytes must have a length of 48") 976 self._data = memoryview(_b_data) 977 978 else: 979 self._data = memoryview(bytearray(48)) 980 self.size = size 981 self.z4 = z4 982 self.custom_data = custom_data or b"\x00" * 8 983 self.machine_id = machine_id or b"\x00" * 32 984 985 def __eq__(self, other: object) -> bool: 986 if not isinstance(other, (bytes, SingleHost)): 987 return False 988 989 if isinstance(other, SingleHost): 990 other = other.pack() 991 992 return self.pack() == other 993 994 @property 995 def size(self) -> int: 996 return struct.unpack("<I", self._data[:4].tobytes())[0] 997 998 @size.setter 999 def size(self, value: int) -> None: 1000 self._data[:4] = struct.pack("<I", value) 1001 1002 @property 1003 def z4(self) -> int: 1004 return struct.unpack("<I", self._data[4:8].tobytes())[0] 1005 1006 @z4.setter 1007 def z4(self, value: int) -> None: 1008 self._data[4:8] = struct.pack("<I", value) 1009 1010 @property 1011 def custom_data(self) -> bytes: 1012 return self._data[8:16].tobytes() 1013 1014 @custom_data.setter 1015 def custom_data(self, value: bytes) -> None: 1016 if len(value) != 8: 1017 raise ValueError("custom_data length must be 8 bytes long") 1018 1019 self._data[8:16] = value 1020 1021 @property 1022 def machine_id(self) -> bytes: 1023 return self._data[16:48].tobytes() 1024 1025 @machine_id.setter 1026 def machine_id(self, value: bytes) -> None: 1027 if len(value) != 32: 1028 raise ValueError("machine_id length must be 32 bytes long") 1029 1030 self._data[16:48] = value 1031 1032 def pack(self) -> bytes: 1033 """ Packs the structure to bytes. """ 1034 return self._data.tobytes() 1035 1036 @staticmethod 1037 def unpack(b_data: bytes) -> "SingleHost": 1038 """ Creates a SignleHost object from raw bytes. """ 1039 return SingleHost(_b_data=b_data) 1040 1041 1042class Version: 1043 """A structure contains the OS information. 1044 1045 The `VERSION`_ structure contains operating system version information that SHOULD be ignored. This structure is 1046 used for debugging purposes only and its value does not affect NTLM message processing. It is populated in the NTLM 1047 messages only if `NTLMSSP_NEGOTIATE_VERSION` (`NegotiateFlags.version`) is negotiated. 1048 1049 Args: 1050 major: See args. The 8-bit unsigned int for the version major part. 1051 minor: The 8-bit unsigned int for the version minor part. 1052 build: The 16-bit unsigned int for the version build part. 1053 revision: An 8-bit unsigned integer for the current NTLMSSP revision. This field SHOULD be `0x0F`. 1054 _b_data: Create a Version object from the raw data byte string. 1055 1056 Attrs: 1057 major: See args. 1058 minor: See args. 1059 build: See args. 1060 reserved: A reserved 3-byte field that isn't used in the NTLM spec. 1061 revision: See args. 1062 1063 .. _VERSION: 1064 https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-nlmp/b1a6ceb2-f8ad-462b-b5af-f18527c48175 1065 """ 1066 1067 def __init__( 1068 self, 1069 major: int = 0, 1070 minor: int = 0, 1071 build: int = 0, 1072 revision: int = 0x0F, 1073 _b_data: typing.Optional[bytes] = None, 1074 ) -> None: 1075 if _b_data: 1076 if len(_b_data) != 8: 1077 raise ValueError("Version bytes must have a length of 8") 1078 1079 self._data = memoryview(_b_data) 1080 1081 else: 1082 self._data = memoryview(bytearray(8)) 1083 self.major = major 1084 self.minor = minor 1085 self.build = build 1086 self.revision = revision 1087 1088 def __eq__(self, other: object) -> bool: 1089 if not isinstance(other, (bytes, Version)): 1090 return False 1091 1092 if isinstance(other, Version): 1093 other = other.pack() 1094 1095 return self.pack() == other 1096 1097 def __len__(self): 1098 return 8 1099 1100 @property 1101 def major(self) -> int: 1102 return struct.unpack("B", self._data[0:1].tobytes())[0] 1103 1104 @major.setter 1105 def major(self, value: int) -> None: 1106 self._data[0:1] = struct.pack("B", value) 1107 1108 @property 1109 def minor(self) -> int: 1110 return struct.unpack("B", self._data[1:2].tobytes())[0] 1111 1112 @minor.setter 1113 def minor(self, value: int) -> None: 1114 self._data[1:2] = struct.pack("B", value) 1115 1116 @property 1117 def build(self) -> int: 1118 return struct.unpack("<H", self._data[2:4].tobytes())[0] 1119 1120 @build.setter 1121 def build(self, value: int) -> None: 1122 self._data[2:4] = struct.pack("<H", value) 1123 1124 @property 1125 def reserved(self) -> bytes: 1126 return self._data[4:7].tobytes() 1127 1128 @property 1129 def revision(self) -> int: 1130 return struct.unpack("B", self._data[7:8].tobytes())[0] 1131 1132 @revision.setter 1133 def revision(self, value: int) -> None: 1134 self._data[7:8] = struct.pack("B", value) 1135 1136 def __repr__(self) -> str: 1137 return "<{0}.{1} {2}.{3}.{4}.{5}>".format(type(self).__module__, type(self).__name__, self.major, self.minor, 1138 self.build, self.revision) 1139 1140 def __str__(self) -> str: 1141 return "%s.%s.%s.%s" % (self.major, self.minor, self.build, self.revision) 1142 1143 def pack(self) -> bytes: 1144 """ Packs the structure to bytes. """ 1145 return self._data.tobytes() 1146 1147 @staticmethod 1148 def get_current() -> "Version": 1149 """ Generates an NTLM Version structure based on the pyspnego package version. """ 1150 versions = [] 1151 for v in pyspnego_version.split('.', 3): 1152 if not v: 1153 continue 1154 1155 match = re.match(r'^(\d+)', v) 1156 if match: 1157 versions.append(int(match.group(1))) 1158 1159 versions += [0] * (3 - len(versions)) 1160 1161 return Version(major=int(versions[0]), minor=int(versions[1]), build=int(versions[2])) 1162 1163 @staticmethod 1164 def unpack(b_data: bytes) -> "Version": 1165 """ Creates a Version object from raw bytes. """ 1166 return Version(_b_data=b_data) 1167