1# Copyright: (c) 2020, Jordan Borean (@jborean93) <jborean93@gmail.com>
2# MIT License (see LICENSE or https://opensource.org/licenses/MIT)
3
4import collections
5import datetime
6import enum
7import io
8import re
9import struct
10import typing
11
12from spnego._text import to_text
13from spnego._version import __version__ as pyspnego_version
14
15
16class NegotiateFlags(enum.IntFlag):
17    """NTLM Negotiation flags.
18
19    Used during NTLM negotiation to negotiate the capabilities between the client and server.
20
21    .. _NEGOTIATE:
22        https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-nlmp/99d90ff4-957f-4c8a-80e4-5bfe5a9a9832
23    """
24    key_56 = 0x80000000
25    key_exch = 0x40000000
26    key_128 = 0x20000000
27    r1 = 0x10000000
28    r2 = 0x08000000
29    r3 = 0x04000000
30    version = 0x02000000
31    r4 = 0x01000000
32    target_info = 0x00800000
33    non_nt_session_key = 0x00400000
34    r5 = 0x00200000
35    identity = 0x00100000
36    extended_session_security = 0x00080000
37    target_type_share = 0x00040000  # Not documented in MS-NLMP
38    target_type_server = 0x00020000
39    target_type_domain = 0x00010000
40    always_sign = 0x00008000
41    local_call = 0x00004000  # Not documented in MS-NLMP
42    oem_workstation_supplied = 0x00002000
43    oem_domain_name_supplied = 0x00001000
44    anonymous = 0x00000800
45    r8 = 0x00000400
46    ntlm = 0x00000200
47    r9 = 0x00000100
48    lm_key = 0x00000080
49    datagram = 0x00000040
50    seal = 0x00000020
51    sign = 0x00000010
52    netware = 0x00000008  # Not documented in MS-NLMP
53    request_target = 0x00000004
54    oem = 0x00000002
55    unicode = 0x00000001
56
57    @classmethod
58    def native_labels(cls) -> typing.Dict["NegotiateFlags", str]:
59        return {
60            NegotiateFlags.key_56: 'NTLMSSP_NEGOTIATE_56',
61            NegotiateFlags.key_exch: 'NTLMSSP_NEGOTIATE_KEY_EXCH',
62            NegotiateFlags.key_128: 'NTLMSSP_NEGOTIATE_128',
63            NegotiateFlags.r1: 'NTLMSSP_RESERVED_R1',
64            NegotiateFlags.r2: 'NTLMSSP_RESERVED_R2',
65            NegotiateFlags.r3: 'NTLMSSP_RESERVED_R3',
66            NegotiateFlags.version: 'NTLMSSP_NEGOTIATE_VERSION',
67            NegotiateFlags.r4: 'NTLMSSP_RESERVED_R4',
68            NegotiateFlags.target_info: 'NTLMSSP_NEGOTIATE_TARGET_INFO',
69            NegotiateFlags.non_nt_session_key: 'NTLMSSP_REQUEST_NON_NT_SESSION_KEY',
70            NegotiateFlags.r5: 'NTLMSSP_RESERVED_R5',
71            NegotiateFlags.identity: 'NTLMSSP_NEGOTIATE_IDENTITY',
72            NegotiateFlags.extended_session_security: 'NTLMSSP_NEGOTIATE_EXTENDED_SESSIONSECURITY',
73            NegotiateFlags.target_type_share: 'NTLMSSP_TARGET_TYPE_SHARE - R6',
74            NegotiateFlags.target_type_server: 'NTLMSSP_TARGET_TYPE_SERVER',
75            NegotiateFlags.target_type_domain: 'NTLMSSP_TARGET_TYPE_DOMAIN',
76            NegotiateFlags.always_sign: 'NTLMSSP_NEGOTIATE_ALWAYS_SIGN',
77            NegotiateFlags.local_call: 'NTLMSSP_NEGOTIATE_LOCAL_CALL - R7',
78            NegotiateFlags.oem_workstation_supplied: 'NTLMSSP_NEGOTIATE_OEM_WORKSTATION_SUPPLIED',
79            NegotiateFlags.oem_domain_name_supplied: 'NTLMSSP_NEGOTIATE_OEM_DOMAIN_SUPPLIED',
80            NegotiateFlags.anonymous: 'NTLMSSP_ANOYNMOUS',
81            NegotiateFlags.r8: 'NTLMSSP_RESERVED_R8',
82            NegotiateFlags.ntlm: 'NTLMSSP_NEGOTIATE_NTLM',
83            NegotiateFlags.r9: 'NTLMSSP_RESERVED_R9',
84            NegotiateFlags.lm_key: 'NTLMSSP_NEGOTIATE_LM_KEY',
85            NegotiateFlags.datagram: 'NTLMSSP_NEGOTIATE_DATAGRAM',
86            NegotiateFlags.seal: 'NTLMSSP_NEGOTIATE_SEAL',
87            NegotiateFlags.sign: 'NTLMSSP_NEGOTIATE_SIGN',
88            NegotiateFlags.netware: 'NTLMSSP_NEGOTIATE_NETWARE - R10',
89            NegotiateFlags.request_target: 'NTLMSSP_REQUEST_TARGET',
90            NegotiateFlags.oem: 'NTLMSSP_NEGOTIATE_OEM',
91            NegotiateFlags.unicode: 'NTLMSSP_NEGOTIATE_UNICODE',
92        }
93
94
95class AvId(enum.IntFlag):
96    """ID for an NTLM AV_PAIR.
97
98    These are the IDs that can be set as the `AvId` on an `AV_PAIR`_.
99
100    .. _AV_PAIR:
101        https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-nlmp/83f5e789-660d-4781-8491-5f8c6641f75e
102    """
103    eol = 0x0000
104    nb_computer_name = 0x0001
105    nb_domain_name = 0x0002
106    dns_computer_name = 0x0003
107    dns_domain_name = 0x0004
108    dns_tree_name = 0x0005
109    flags = 0x0006
110    timestamp = 0x0007
111    single_host = 0x0008
112    target_name = 0x0009
113    channel_bindings = 0x000A
114
115    @classmethod
116    def native_labels(cls) -> typing.Dict["AvId", str]:
117        return {
118            AvId.eol: 'MSV_AV_EOL',
119            AvId.nb_computer_name: 'MSV_AV_NB_COMPUTER_NAME',
120            AvId.nb_domain_name: 'MSV_AV_NB_DOMAIN_NAME',
121            AvId.dns_computer_name: 'MSV_AV_DNS_COMPUTER_NAME',
122            AvId.dns_domain_name: 'MSV_AV_DNS_DOMAIN_NAME',
123            AvId.dns_tree_name: 'MSV_AV_DNS_TREE_NAME',
124            AvId.flags: 'MSV_AV_FLAGS',
125            AvId.timestamp: 'MSV_AV_TIMESTAMP',
126            AvId.single_host: 'MSV_AV_SINGLE_HOST',
127            AvId.target_name: 'MSV_AV_TARGET_NAME',
128            AvId.channel_bindings: 'MSV_AV_CHANNEL_BINDINGS',
129        }
130
131
132class AvFlags(enum.IntFlag):
133    """MsvAvFlags for an AV_PAIR.
134
135    These are the flags that can be set on the MsvAvFlags entry of an NTLM `AV_PAIR`_.
136
137    .. _AV_PAIR:
138        https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-nlmp/83f5e789-660d-4781-8491-5f8c6641f75e
139    """
140    none = 0x00000000
141    constrained = 0x00000001
142    mic = 0x00000002
143    untrusted_spn = 0x00000004
144
145    @classmethod
146    def native_labels(cls) -> typing.Dict["AvFlags", str]:
147        return {
148            AvFlags.constrained: 'AUTHENTICATION_CONSTRAINED',
149            AvFlags.mic: 'MIC_PROVIDED',
150            AvFlags.untrusted_spn: 'UNTRUSTED_SPN_SOURCE',
151        }
152
153
154class MessageType(enum.IntEnum):
155    negotiate = 1
156    challenge = 2
157    authenticate = 3
158
159    @classmethod
160    def native_labels(cls) -> typing.Dict["MessageType", str]:
161        return {
162            MessageType.negotiate: 'NEGOTIATE_MESSAGE',
163            MessageType.challenge: 'CHALLENGE_MESSAGE',
164            MessageType.authenticate: 'AUTHENTICATE_MESSAGE',
165        }
166
167
168def _get_payload_offset(b_data: memoryview, field_offsets: typing.List[int]) -> int:
169    payload_offset = None
170
171    for field_offset in field_offsets:
172        offset = struct.unpack("<I", b_data[field_offset + 4:field_offset + 8].tobytes())[0]
173        if not payload_offset or (offset and offset < payload_offset):
174            payload_offset = offset
175
176    return payload_offset or len(b_data)
177
178
179def _pack_payload(
180    data: typing.Any,
181    b_payload: bytearray,
182    payload_offset: int,
183    pack_func: typing.Optional[typing.Callable[[typing.Any], bytes]] = None,
184) -> typing.Tuple[bytes, int]:
185    if data:
186        b_data = pack_func(data) if pack_func else data
187    else:
188        b_data = b""
189
190    b_payload.extend(b_data)
191    length = len(b_data)
192
193    b_field = (struct.pack("<H", length) * 2) + struct.pack("<I", payload_offset)
194    payload_offset += length
195
196    return b_field, payload_offset
197
198
199def _unpack_payload(
200    b_data: memoryview,
201    field_offset: int,
202    unpack_func: typing.Optional[typing.Callable[[bytes], typing.Any]] = None,
203) -> typing.Any:
204    field_len = struct.unpack("<H", b_data[field_offset:field_offset + 2].tobytes())[0]
205    if field_len:
206        field_offset = struct.unpack("<I", b_data[field_offset + 4:field_offset + 8].tobytes())[0]
207        b_value = b_data[field_offset:field_offset + field_len].tobytes()
208
209        return unpack_func(b_value) if unpack_func else b_value
210
211
212class _NTLMMessageMeta(type):
213    __registry: typing.Dict[int, typing.Type] = {}
214
215    def __init__(cls, name, bases, attributes):
216        cls.__registry[cls.MESSAGE_TYPE] = cls
217
218    def __call__(cls, *args, **kwargs):
219        if '_b_data' in kwargs:
220            message_type = struct.unpack("<I", kwargs['_b_data'][8:12])[0]
221            new_cls = cls.__registry[message_type]
222
223        else:
224            new_cls = cls
225
226        return super(_NTLMMessageMeta, new_cls).__call__(*args, **kwargs)
227
228
229class NTLMMessage(metaclass=_NTLMMessageMeta):
230    """ Base NTLM message class that defines the pack and unpack functions. """
231
232    MESSAGE_TYPE = 0
233    MINIMUM_LENGTH = 0
234
235    def __init__(self, encoding: typing.Optional[str] = None, _b_data: typing.Optional[bytes] = None) -> None:
236        self.signature = b"NTLMSSP\x00" + struct.pack("<I", self.MESSAGE_TYPE)
237        self._encoding = encoding or 'windows-1252'
238
239        if _b_data:
240            if len(_b_data) < self.MINIMUM_LENGTH:
241                raise ValueError("Invalid NTLM %s raw byte length" % self.__class__.__name__)
242
243            self._data = memoryview(bytearray(_b_data))
244
245        else:
246            self._data = memoryview(b"")
247
248    def pack(self) -> bytes:
249        """ Packs the structure to bytes. """
250        return self._data.tobytes()
251
252    @staticmethod
253    def unpack(b_data: bytes, encoding: typing.Optional[str] = None) -> "NTLMMessage":
254        """ Unpacks the structure from bytes. """
255        return NTLMMessage(encoding=encoding, _b_data=b_data)
256
257
258class Negotiate(NTLMMessage):
259    """NTLM Negotiate Message
260
261    This structure represents an NTLM `NEGOTIATE_MESSAGE`_ that can be serialized and deserialized to and from
262    bytes.
263
264    Args:
265        flags: The `NegotiateFlags` that the client has negotiated.
266        domain_name: The `DomainName` of the client authentication domain.
267        workstation: The `Workstation` of the client.
268        version: The `Version` of the client.
269        encoding: The OEM encoding to use for text fields.
270        _b_data: The raw bytes of the message to unpack from.
271
272    .. _NEGOTIATE_MESSAGE:
273        https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-nlmp/b34032e5-3aae-4bc6-84c3-c6d80eadf7f2
274    """
275
276    MESSAGE_TYPE = MessageType.negotiate
277    MINIMUM_LENGTH = 32
278
279    def __init__(
280        self,
281        flags: int = 0,
282        domain_name: typing.Optional[str] = None,
283        workstation: typing.Optional[str] = None,
284        version: typing.Optional["Version"] = None,
285        encoding: typing.Optional[str] = None,
286        _b_data: typing.Optional[bytes] = None,
287    ) -> None:
288        super(Negotiate, self).__init__(encoding=encoding, _b_data=_b_data)
289
290        if not _b_data:
291            b_payload = bytearray()
292
293            payload_offset = 32
294
295            b_version = b""
296            if version:
297                flags |= NegotiateFlags.version
298                b_version = version.pack()
299            payload_offset = _pack_payload(b_version, b_payload, payload_offset)[1]
300
301            b_domain_name = b""
302            if domain_name:
303                flags |= NegotiateFlags.oem_domain_name_supplied
304                b_domain_name = domain_name.encode(self._encoding)
305            b_domain_name_fields, payload_offset = _pack_payload(b_domain_name, b_payload, payload_offset)
306
307            b_workstation = b""
308            if workstation:
309                flags |= NegotiateFlags.oem_workstation_supplied
310                b_workstation = workstation.encode(self._encoding)
311            b_workstation_fields = _pack_payload(b_workstation, b_payload, payload_offset)[0]
312
313            b_data = bytearray(self.signature)
314            b_data.extend(struct.pack("<I", flags))
315            b_data.extend(b_domain_name_fields)
316            b_data.extend(b_workstation_fields)
317            b_data.extend(b_payload)
318
319            self._data = memoryview(b_data)
320
321    @property
322    def flags(self) -> int:
323        """ The negotiate flags for the Negotiate message. """
324        return struct.unpack("<I", self._data[12:16].tobytes())[0]
325
326    @flags.setter
327    def flags(self, value: int) -> None:
328        self._data[12:16] = struct.pack("<I", value)
329
330    @property
331    def domain_name(self) -> typing.Optional[str]:
332        """ The name of the client authentication domain. """
333        return to_text(_unpack_payload(self._data, 16), encoding=self._encoding, errors='replace',
334                       nonstring='passthru')
335
336    @property
337    def workstation(self) -> typing.Optional[str]:
338        """ The name of the client machine. """
339        return to_text(_unpack_payload(self._data, 24), encoding=self._encoding, errors='replace',
340                       nonstring='passthru')
341
342    @property
343    def version(self) -> typing.Optional["Version"]:
344        """ The client NTLM version. """
345        payload_offset = self._payload_offset
346
347        # If the payload offset is at 40 or more then the Version, or at least empty bytes, is in the payload.
348        if payload_offset >= 40:
349            return Version.unpack(self._data[32:40].tobytes())
350
351        else:
352            return None
353
354    @property
355    def _payload_offset(self) -> int:
356        """ Gets the offset of the first payload value. """
357        return _get_payload_offset(self._data, [16, 24])
358
359
360class Challenge(NTLMMessage):
361    """NTLM Challenge Message
362
363    This structure represents an NTLM `CHALLENGE_MESSAGE`_ that can be serialized and deserialized to and from
364    bytes.
365
366    Args:
367        flags: The `NegotiateFlags` that the client has negotiated.
368        server_challenge: The random 64-bit `ServerChallenge` nonce.
369        target_name: The name of the acceptor server.
370        target_info: The variable length `TargetInfo` information.
371        version: The `Version` of the server.
372        encoding: The OEM encoding to use for text fields if `NTLMSSP_NEGOTIATE_UNICODE` was not supported.
373        _b_data: The raw NTLM Challenge bytes to unpack from.
374
375    .. _CHALLENGE_MESSAGE:
376        https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-nlmp/801a4681-8809-4be9-ab0d-61dcfe762786
377    """
378
379    MESSAGE_TYPE = MessageType.challenge
380    MINIMUM_LENGTH = 48
381
382    def __init__(
383        self,
384        flags: int = 0,
385        server_challenge: typing.Optional[bytes] = None,
386        target_name: typing.Optional[str] = None,
387        target_info: typing.Optional["TargetInfo"] = None,
388        version: typing.Optional["Version"] = None,
389        encoding: typing.Optional[str] = None,
390        _b_data: typing.Optional[bytes] = None,
391    ):
392        super(Challenge, self).__init__(encoding=encoding, _b_data=_b_data)
393
394        if _b_data:
395            self._encoding = 'utf-16-le' if self.flags & NegotiateFlags.unicode else self._encoding
396
397        else:
398            self._encoding = 'utf-16-le' if flags & NegotiateFlags.unicode else self._encoding
399
400            b_payload = bytearray()
401            payload_offset = 48
402
403            b_version = b""
404            if version:
405                flags |= NegotiateFlags.version
406                b_version = version.pack()
407            payload_offset = _pack_payload(b_version, b_payload, payload_offset)[1]
408
409            b_target_name = b""
410            if target_name:
411                flags |= NegotiateFlags.request_target
412                b_target_name = target_name.encode(self._encoding)
413            b_target_name_fields, payload_offset = _pack_payload(b_target_name, b_payload, payload_offset)
414
415            b_target_info = b""
416            if target_info:
417                flags |= NegotiateFlags.target_info
418                b_target_info = target_info.pack()
419            b_target_info_fields = _pack_payload(b_target_info, b_payload, payload_offset)[0]
420
421            b_data = bytearray(self.signature)
422            b_data.extend(b_target_name_fields)
423            b_data.extend(struct.pack("<I", flags))
424            b_data.extend(b"\x00" * 8)  # ServerChallenge, set after self._data is initialised.
425            b_data.extend(b"\x00" * 8)  # Reserved
426            b_data.extend(b_target_info_fields)
427            b_data.extend(b_payload)
428
429            self._data = memoryview(b_data)
430
431            if server_challenge:
432                self.server_challenge = server_challenge
433
434    @property
435    def target_name(self) -> typing.Optional[str]:
436        """ The name of the server authentication realm. """
437        return to_text(_unpack_payload(self._data, 12), encoding=self._encoding, nonstring='passthru')
438
439    @property
440    def flags(self) -> int:
441        """ The negotiate flags supported by the server. """
442        return struct.unpack("<I", self._data[20:24].tobytes())[0]
443
444    @flags.setter
445    def flags(self, value: int) -> None:
446        self._data[20:24] = struct.pack("<I", value)
447
448    @property
449    def server_challenge(self) -> bytes:
450        """ The server's 8 byte nonce challenge. """
451        return self._data[24:32].tobytes()
452
453    @server_challenge.setter
454    def server_challenge(self, value: bytes) -> None:
455        if not value or len(value) != 8:
456            raise ValueError("NTLM Challenge ServerChallenge must be 8 bytes long")
457
458        self._data[24:32] = value
459
460    @property
461    def target_info(self) -> typing.Optional["TargetInfo"]:
462        """ The AV_PAIR structures generated by the server. """
463        return _unpack_payload(self._data, 40, lambda d: TargetInfo.unpack(d))
464
465    @property
466    def version(self) -> typing.Optional["Version"]:
467        """ The server NTLM version. """
468        payload_offset = self._payload_offset
469
470        # If the payload offset is at 56 or more then the Version, or at least empty bytes, is in the payload.
471        if payload_offset >= 56:
472            return Version.unpack(self._data[48:56].tobytes())
473
474        else:
475            return None
476
477    @property
478    def _payload_offset(self) -> int:
479        """ Gets the offset of the first payload value. """
480        return _get_payload_offset(self._data, [12, 40])
481
482
483class Authenticate(NTLMMessage):
484    """NTLM Authentication Message
485
486    This structure represents an NTLM `AUTHENTICATION_MESSAGE`_ that can be serialized and deserialized to and from
487    bytes.
488
489    Args:
490        flags: The `NegotiateFlags` that the client has negotiated.
491        lm_challenge_response: The `LmChallengeResponse` for the client's secret.
492        nt_challenge_response: The `NtChallengeResponse` for the client's secret.
493        domain_name: The `DomainName` for the client.
494        username: The `UserName` for the cleint.
495        workstation: The `Workstation` for the client.
496        encrypted_session_key: The `EncryptedRandomSessionKey` for the set up context.
497        version: The `Version` of the client.
498        mic: The `MIC` for the authentication exchange.
499        encoding: The OEM encoding to use for text fields if `NTLMSSP_NEGOTIATE_UNICODE` was not supported.
500        _b_data: The raw NTLM Authenticate bytes to unpack from.
501
502    .. _AUTHENTICATION_MESSAGE:
503        https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-nlmp/033d32cc-88f9-4483-9bf2-b273055038ce
504    """
505
506    MESSAGE_TYPE = MessageType.authenticate
507    MINIMUM_LENGTH = 64
508
509    def __init__(
510        self,
511        flags: int = 0,
512        lm_challenge_response: typing.Optional[bytes] = None,
513        nt_challenge_response: typing.Optional[bytes] = None,
514        domain_name: typing.Optional[str] = None,
515        username: typing.Optional[str] = None,
516        workstation: typing.Optional[str] = None,
517        encrypted_session_key: typing.Optional[bytes] = None,
518        version: typing.Optional["Version"] = None,
519        mic: typing.Optional[bytes] = None,
520        encoding: typing.Optional[str] = None,
521        _b_data: typing.Optional[bytes] = None,
522    ) -> None:
523        super(Authenticate, self).__init__(encoding=encoding, _b_data=_b_data)
524
525        if _b_data:
526            self._encoding = 'utf-16-le' if self.flags & NegotiateFlags.unicode else self._encoding
527
528        else:
529            self._encoding = 'utf-16-le' if flags & NegotiateFlags.unicode else self._encoding
530
531            b_payload = bytearray()
532
533            payload_offset = 64
534
535            # While MS server accept a blank version field, other implementations aren't so kind. No need to be strict
536            # about it and only add the version bytes if it's present.
537            b_version = b""
538            if version:
539                flags |= NegotiateFlags.version
540                b_version = version.pack()
541            payload_offset = _pack_payload(b_version, b_payload, payload_offset)[1]
542
543            # MIC
544            payload_offset = _pack_payload(b"\x00" * 16, b_payload, payload_offset)[1]
545
546            b_lm_response_fields, payload_offset = _pack_payload(lm_challenge_response, b_payload, payload_offset)
547            b_nt_response_fields, payload_offset = _pack_payload(nt_challenge_response, b_payload, payload_offset)
548            b_domain_fields, payload_offset = _pack_payload(domain_name, b_payload, payload_offset,
549                                                            lambda d: d.encode(self._encoding))
550            b_username_fields, payload_offset = _pack_payload(username, b_payload, payload_offset,
551                                                              lambda d: d.encode(self._encoding))
552            b_workstation_fields, payload_offset = _pack_payload(workstation, b_payload, payload_offset,
553                                                                 lambda d: d.encode(self._encoding))
554            if encrypted_session_key:
555                flags |= NegotiateFlags.key_exch
556            b_session_key_fields = _pack_payload(encrypted_session_key, b_payload, payload_offset)[0]
557
558            b_data = bytearray(self.signature)
559            b_data.extend(b_lm_response_fields)
560            b_data.extend(b_nt_response_fields)
561            b_data.extend(b_domain_fields)
562            b_data.extend(b_username_fields)
563            b_data.extend(b_workstation_fields)
564            b_data.extend(b_session_key_fields)
565            b_data.extend(struct.pack("<I", flags))
566            b_data.extend(b_payload)
567
568            self._data = memoryview(b_data)
569
570            if mic:
571                self.mic = mic
572
573    @property
574    def lm_challenge_response(self) -> typing.Optional[bytes]:
575        """ The LmChallengeResponse or None if not set. """
576        return _unpack_payload(self._data, 12)
577
578    @property
579    def nt_challenge_response(self) -> typing.Optional[bytes]:
580        """ The NtChallengeResponse or None if not set. """
581        return _unpack_payload(self._data, 20)
582
583    @property
584    def domain_name(self) -> typing.Optional[str]:
585        """ The domain or computer name hosting the user account. """
586        return to_text(_unpack_payload(self._data, 28), encoding=self._encoding, nonstring='passthru')
587
588    @property
589    def user_name(self) -> typing.Optional[str]:
590        """ The name of the user to be authenticated. """
591        return to_text(_unpack_payload(self._data, 36), encoding=self._encoding, nonstring='passthru')
592
593    @property
594    def workstation(self) -> typing.Optional[str]:
595        """ The name of the computer to which the user is logged on. """
596        return to_text(_unpack_payload(self._data, 44), encoding=self._encoding, nonstring='passthru')
597
598    @property
599    def encrypted_random_session_key(self) -> typing.Optional[bytes]:
600        """ The client's encrypted random session key. """
601        return _unpack_payload(self._data, 52)
602
603    @property
604    def flags(self) -> int:
605        """ The negotiate flags supported by the client and server. """
606        return struct.unpack("<I", self._data[60:64].tobytes())[0]
607
608    @flags.setter
609    def flags(self, value: int) -> None:
610        self._data[60:64] = struct.pack("<I", value)
611
612    @property
613    def version(self) -> typing.Optional["Version"]:
614        """ The client NTLM version. """
615        payload_offset = self._payload_offset
616
617        # If the payload offset is at 64 (no MIC or Version) or 80 (only MIC) then no version is present.
618        if payload_offset not in [64, 80] and payload_offset >= 72:
619            return Version.unpack(self._data[64:72].tobytes())
620
621        else:
622            return None
623
624    @property
625    def mic(self) -> typing.Optional[bytes]:
626        """ The MIC for the Authenticate message. """
627        mic_offset = self._get_mic_offset()
628        if mic_offset:
629            return self._data.tobytes()[mic_offset:mic_offset + 16]
630
631        else:
632            return None
633
634    @mic.setter
635    def mic(self, value: bytes) -> None:
636        if len(value) != 16:
637            raise ValueError("NTLM Authenticate MIC must be 16 bytes long")
638
639        mic_offset = self._get_mic_offset()
640        if mic_offset:
641            self._data[mic_offset:mic_offset + 16] = value
642        else:
643            raise ValueError("Cannot set MIC on an Authenticate message with no MIC present")
644
645    @property
646    def _payload_offset(self) -> int:
647        """ Gets the offset of the first payload value. """
648        return _get_payload_offset(self._data, [12, 20, 28, 36, 44, 52])
649
650    def _get_mic_offset(self) -> int:
651        """ Gets the offset of the MIC structure if present. """
652        payload_offset = self._payload_offset
653
654        # If the payload offset is 88 or more then we must have the Version (8 bytes) and the MIC (16 bytes) plus
655        # any random data after that.
656        if payload_offset >= 88:
657            return 72
658
659        # If the payload offset is between 80 and 88, then we should have just the MIC and no Version.
660        elif payload_offset >= 80:
661            return 64
662
663        # Not enough room for a MIC between the minimum size and the payload offset.
664        else:
665            return 0
666
667
668class FileTime(datetime.datetime):
669    """Windows FILETIME structure.
670
671    FILETIME structure representing number of 100-nanosecond intervals that have elapsed since January 1, 1601 UTC.
672    This subclasses the datetime object to provide a similar interface but with the `nanosecond` attribute.
673
674    Attrs:
675        nanosecond (int): The number of nanoseconds (< 1000) in the FileTime. Note this only has a precision of up to
676            100 nanoseconds.
677
678    .. _FILETIME:
679        https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-dtyp/2c57429b-fdd4-488f-b5fc-9e4cf020fcdf
680    """
681
682    _EPOCH_FILETIME = 116444736000000000  # 1970-01-01 as FILETIME.
683
684    def __new__(cls, *args: typing.Any, **kwargs: typing.Any) -> "FileTime":
685        ns = 0
686        if 'nanosecond' in kwargs:
687            ns = kwargs.pop('nanosecond')
688
689        dt = super(FileTime, cls).__new__(cls, *args, **kwargs)
690        dt.nanosecond = ns
691
692        return dt
693
694    def __init__(self, *args: typing.Any, **kwargs: typing.Any) -> None:
695        super().__init__()
696        self.nanosecond = getattr(self, "nanosecond", None) or 0
697
698    @classmethod
699    def now(cls, tz: typing.Optional[datetime.tzinfo] = None) -> "FileTime":
700        """ Construct a FileTime from the current time and optional time zone info. """
701        return FileTime.from_datetime(datetime.datetime.now(tz=tz))
702
703    @classmethod
704    def from_datetime(cls, dt: datetime.datetime, ns: int = 0) -> "FileTime":
705        """ Creates a FileTime object from a datetime object. """
706        return FileTime(year=dt.year, month=dt.month, day=dt.day, hour=dt.hour, minute=dt.minute, second=dt.second,
707                        microsecond=dt.microsecond, tzinfo=dt.tzinfo, nanosecond=ns)
708
709    def __str__(self) -> str:
710        """ Displays the datetime in ISO 8601 including the 100th nanosecond internal like .NET does. """
711        fraction_seconds = ""
712
713        if self.microsecond or self.nanosecond:
714            fraction_seconds = self.strftime('.%f')
715
716            if self.nanosecond:
717                fraction_seconds += str(self.nanosecond // 100)
718
719        timezone = 'Z'
720        if self.tzinfo:
721            utc_offset = self.strftime('%z')
722            timezone = "%s:%s" % (utc_offset[:3], utc_offset[3:])
723
724        # strftime doesn't support dates < 1900 on Python 2.7
725        return '{0}-{1:02d}-{2:02d}T{3:02d}:{4:02d}:{5:02d}{6}{7}'.format(
726            self.year, self.month, self.day, self.hour, self.minute, self.second, fraction_seconds, timezone)
727
728    def pack(self) -> bytes:
729        """ Packs the structure to bytes. """
730        # Make sure we are dealing with a timezone aware datetime
731        utc_tz = datetime.timezone.utc
732        utc_dt = self.replace(tzinfo=self.tzinfo if self.tzinfo else utc_tz)
733
734        # Get the time since UTC EPOCH in microseconds
735        td = utc_dt.astimezone(utc_tz) - datetime.datetime(1970, 1, 1, 0, 0, 0, tzinfo=utc_tz)
736        epoch_time_ms = (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10 ** 6)
737
738        # Add the EPOCH_FILETIME to the microseconds since EPOCH and finally the nanoseconds part.
739        ns100 = FileTime._EPOCH_FILETIME + (epoch_time_ms * 10) + (self.nanosecond // 100)
740
741        return struct.pack("<Q", ns100)
742
743    @staticmethod
744    def unpack(b_data: bytes) -> "FileTime":
745        """ Unpacks the structure from bytes. """
746        filetime = struct.unpack("<Q", b_data)[0]  # 100 nanosecond intervals since 1601-01-01.
747
748        # Create a datetime object based on the filetime microseconds
749        epoch_time_ms = (filetime - FileTime._EPOCH_FILETIME) // 10
750        dt = datetime.datetime(1970, 1, 1) + datetime.timedelta(microseconds=epoch_time_ms)
751
752        # Create the FileTime object from the datetime object and add the nanoseconds.
753        ns = int(filetime % 10) * 100
754
755        return FileTime.from_datetime(dt, ns=ns)
756
757
758class NTClientChallengeV2:
759    """NTLMv2 Client Challenge
760
761    The `NTLMv2_CLIENT_CHALLENGE`_ structure defines the client challenge in the AUTHENTICATE_MESSAGE. This structure
762    is only used when NTLMv2 authentication is configured and is transported in the NT Challenge Response.
763
764    Args:
765        time_stamp: The timestamp, defaults to the current time.
766        client_challenge: The 8 byte nonce generated by the client.
767        av_pairs: The TargetInfo AV_PAIRS for the client challenge.
768        _b_data: Raw byte string for the NTClientChallengeV2, when set the other args are ignored.
769
770    .. _NTLMv2_CLIENT_CHALLENGE:
771        https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-nlmp/aee311d6-21a7-4470-92a5-c4ecb022a87b
772    """
773
774    def __init__(
775        self,
776        time_stamp: typing.Optional["FileTime"] = None,
777        client_challenge: typing.Optional[bytes] = None,
778        av_pairs: typing.Optional["TargetInfo"] = None,
779        _b_data: typing.Optional[bytes] = None,
780    ) -> None:
781        if _b_data:
782            if len(_b_data) < 32:
783                raise ValueError("Invalid NTClientChallengeV2 raw byte length")
784
785            self._data = memoryview(bytearray(_b_data))
786
787        else:
788            time_stamp = time_stamp or FileTime.now()
789            av_pairs = av_pairs or TargetInfo()
790
791            b_data = bytearray(b"\x01\x01\x00\x00\x00\x00\x00\x00")
792            b_data.extend(time_stamp.pack())
793            b_data.extend(b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00")
794            b_data.extend(av_pairs.pack())
795
796            self._data = memoryview(b_data)
797
798            client_challenge = client_challenge or b"\x00" * 8
799            self.challenge_from_client = client_challenge
800
801    @property
802    def resp_type(self) -> int:
803        """ The current response type version, must be set to 1. """
804        return struct.unpack("B", self._data[:1].tobytes())[0]
805
806    @resp_type.setter
807    def resp_type(self, value: int) -> None:
808        self._data[:1] = struct.pack("B", value)
809
810    @property
811    def hi_resp_type(self) -> int:
812        """ The maximum response type supported, must be set to 1. """
813        return struct.unpack("B", self._data[1:2].tobytes())[0]
814
815    @hi_resp_type.setter
816    def hi_resp_type(self, value: int) -> None:
817        self._data[1:2] = struct.pack("B", value)
818
819    @property
820    def time_stamp(self) -> "FileTime":
821        """ The current system time. """
822        return FileTime.unpack(self._data[8:16].tobytes())
823
824    @time_stamp.setter
825    def time_stamp(self, value: "FileTime") -> None:
826        self._data[8:16] = value.pack()
827
828    @property
829    def challenge_from_client(self) -> bytes:
830        """ 8 byte client challenge. """
831        return self._data[16:24].tobytes()
832
833    @challenge_from_client.setter
834    def challenge_from_client(self, value: bytes) -> None:
835        if len(value) != 8:
836            raise ValueError("NTClientChallengeV2 ChallengeFromClient must be 8 bytes long")
837        self._data[16:24] = value
838
839    @property
840    def av_pairs(self) -> "TargetInfo":
841        """ The target info AV_PAIR structures. """
842        return TargetInfo.unpack(self._data[28:].tobytes())
843
844    def pack(self) -> bytes:
845        """ Packs the NTClientChallengeV2 to bytes. """
846        return self._data.tobytes()
847
848    @staticmethod
849    def unpack(b_data: bytes) -> "NTClientChallengeV2":
850        """ Unpacks the raw bytes to the NTClientChallengeV2 structure. """
851        return NTClientChallengeV2(_b_data=b_data)
852
853
854class TargetInfo(collections.OrderedDict):
855    """A collection of AV_PAIR structures for the TargetInfo field.
856
857    The `AV_PAIR`_ structure defines an attribute/value pair and sequences of these pairs are using in the
858    :class:`Challenge` and :class:`Authenticate` messages. The value for each pair depends on the AvId specified.
859    Each value can be get/set/del like a normal dictionary where the key is the AvId of the value.
860
861    .. _AV_PAIR:
862        https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-nlmp/83f5e789-660d-4781-8491-5f8c6641f75e
863    """
864
865    _FIELD_TYPES = {
866        'text': (AvId.nb_computer_name, AvId.nb_domain_name, AvId.dns_computer_name, AvId.dns_domain_name,
867                 AvId.dns_tree_name, AvId.target_name),
868        'int32': (AvId.flags,),
869        'struct': (AvId.timestamp, AvId.single_host),
870    }
871
872    def __setitem__(self, key: AvId, value: typing.Any) -> None:
873        if isinstance(value, bytes):
874            if key == AvId.timestamp:
875                value = FileTime.unpack(value)
876            elif key == AvId.single_host:
877                value = SingleHost.unpack(value)
878
879        super(TargetInfo, self).__setitem__(key, value)
880
881    def pack(self) -> bytes:
882        """ Packs the structure to bytes. """
883        b_data = io.BytesIO()
884
885        for av_id, value in self.items():
886            # MsvAvEOL should only be set at the end, will just ignore these entries.
887            if av_id == AvId.eol:
888                continue
889
890            if av_id in self._FIELD_TYPES['text']:
891                b_value = value.encode('utf-16-le')
892            elif av_id in self._FIELD_TYPES['int32']:
893                b_value = struct.pack("<I", value)
894            elif av_id in self._FIELD_TYPES['struct']:
895                b_value = value.pack()
896            else:
897                b_value = value
898
899            b_data.write(struct.pack("<HH", av_id, len(b_value)) + b_value)
900
901        b_data.write(b"\x00\x00\x00\x00")  # MsvAvEOL
902        return b_data.getvalue()
903
904    @staticmethod
905    def unpack(b_data: bytes) -> "TargetInfo":
906        """ Unpacks the structure from bytes. """
907        target_info = TargetInfo()
908        b_io = io.BytesIO(b_data)
909
910        b_av_id = b_io.read(2)
911
912        while b_av_id:
913            av_id = struct.unpack("<H", b_av_id)[0]
914            length = struct.unpack("<H", b_io.read(2))[0]
915            b_value = b_io.read(length)
916
917            value: typing.Any
918            if av_id in TargetInfo._FIELD_TYPES['text']:
919                # All AV_PAIRS are UNICODE encoded.
920                value = b_value.decode('utf-16-le')
921
922            elif av_id in TargetInfo._FIELD_TYPES['int32']:
923                value = AvFlags(struct.unpack("<I", b_value)[0])
924
925            elif av_id == AvId.timestamp:
926                value = FileTime.unpack(b_value)
927
928            elif av_id == AvId.single_host:
929                value = SingleHost.unpack(b_value)
930
931            else:
932                value = b_value
933
934            target_info[AvId(av_id)] = value
935            b_av_id = b_io.read(2)
936
937        return target_info
938
939
940class SingleHost:
941    """Single_Host_Data structure for NTLM TargetInfo entry.
942
943    `Single_Host_Data`_ structure allows a client to send machine-specific information within an authentication
944    exchange to services on the same machine. If the server and client platforms are different or if they are on
945    different hosts, then the information MUST be ignores.
946
947    Args:
948        size: A 32-bit unsigned int that defines size of the structure.
949        z4: A 32-bit integer value, currently set to 0.
950        custom_data: An 8-byte platform-specific blob containing info only relevant when the client and server are on
951            the same host.
952        machine_id: A 32-byte random number created at computer startup to identify the calling machine.
953        _b_data: Create a SingleHost object from the raw data byte string.
954
955    Attributes:
956        size: See args.
957        z4: See args.
958        custom_data: See args.
959        machine_id: See args.
960
961    .. _Single_Host_Data:
962        https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-nlmp/f221c061-cc40-4471-95da-d2ff71c85c5b
963    """
964
965    def __init__(
966        self,
967        size: int = 0,
968        z4: int = 0,
969        custom_data: typing.Optional[bytes] = None,
970        machine_id: typing.Optional[bytes] = None,
971        _b_data: typing.Optional[bytes] = None,
972    ) -> None:
973        if _b_data:
974            if len(_b_data) != 48:
975                raise ValueError("SingleHost bytes must have a length of 48")
976            self._data = memoryview(_b_data)
977
978        else:
979            self._data = memoryview(bytearray(48))
980            self.size = size
981            self.z4 = z4
982            self.custom_data = custom_data or b"\x00" * 8
983            self.machine_id = machine_id or b"\x00" * 32
984
985    def __eq__(self, other: object) -> bool:
986        if not isinstance(other, (bytes, SingleHost)):
987            return False
988
989        if isinstance(other, SingleHost):
990            other = other.pack()
991
992        return self.pack() == other
993
994    @property
995    def size(self) -> int:
996        return struct.unpack("<I", self._data[:4].tobytes())[0]
997
998    @size.setter
999    def size(self, value: int) -> None:
1000        self._data[:4] = struct.pack("<I", value)
1001
1002    @property
1003    def z4(self) -> int:
1004        return struct.unpack("<I", self._data[4:8].tobytes())[0]
1005
1006    @z4.setter
1007    def z4(self, value: int) -> None:
1008        self._data[4:8] = struct.pack("<I", value)
1009
1010    @property
1011    def custom_data(self) -> bytes:
1012        return self._data[8:16].tobytes()
1013
1014    @custom_data.setter
1015    def custom_data(self, value: bytes) -> None:
1016        if len(value) != 8:
1017            raise ValueError("custom_data length must be 8 bytes long")
1018
1019        self._data[8:16] = value
1020
1021    @property
1022    def machine_id(self) -> bytes:
1023        return self._data[16:48].tobytes()
1024
1025    @machine_id.setter
1026    def machine_id(self, value: bytes) -> None:
1027        if len(value) != 32:
1028            raise ValueError("machine_id length must be 32 bytes long")
1029
1030        self._data[16:48] = value
1031
1032    def pack(self) -> bytes:
1033        """ Packs the structure to bytes. """
1034        return self._data.tobytes()
1035
1036    @staticmethod
1037    def unpack(b_data: bytes) -> "SingleHost":
1038        """ Creates a SignleHost object from raw bytes. """
1039        return SingleHost(_b_data=b_data)
1040
1041
1042class Version:
1043    """A structure contains the OS information.
1044
1045    The `VERSION`_ structure contains operating system version information that SHOULD be ignored. This structure is
1046    used for debugging purposes only and its value does not affect NTLM message processing. It is populated in the NTLM
1047    messages only if `NTLMSSP_NEGOTIATE_VERSION` (`NegotiateFlags.version`) is negotiated.
1048
1049    Args:
1050        major: See args. The 8-bit unsigned int for the version major part.
1051        minor: The 8-bit unsigned int for the version minor part.
1052        build: The 16-bit unsigned int for the version build part.
1053        revision: An 8-bit unsigned integer for the current NTLMSSP revision. This field SHOULD be `0x0F`.
1054        _b_data: Create a Version object from the raw data byte string.
1055
1056    Attrs:
1057        major: See args.
1058        minor: See args.
1059        build: See args.
1060        reserved: A reserved 3-byte field that isn't used in the NTLM spec.
1061        revision: See args.
1062
1063    .. _VERSION:
1064        https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-nlmp/b1a6ceb2-f8ad-462b-b5af-f18527c48175
1065    """
1066
1067    def __init__(
1068        self,
1069        major: int = 0,
1070        minor: int = 0,
1071        build: int = 0,
1072        revision: int = 0x0F,
1073        _b_data: typing.Optional[bytes] = None,
1074    ) -> None:
1075        if _b_data:
1076            if len(_b_data) != 8:
1077                raise ValueError("Version bytes must have a length of 8")
1078
1079            self._data = memoryview(_b_data)
1080
1081        else:
1082            self._data = memoryview(bytearray(8))
1083            self.major = major
1084            self.minor = minor
1085            self.build = build
1086            self.revision = revision
1087
1088    def __eq__(self, other: object) -> bool:
1089        if not isinstance(other, (bytes, Version)):
1090            return False
1091
1092        if isinstance(other, Version):
1093            other = other.pack()
1094
1095        return self.pack() == other
1096
1097    def __len__(self):
1098        return 8
1099
1100    @property
1101    def major(self) -> int:
1102        return struct.unpack("B", self._data[0:1].tobytes())[0]
1103
1104    @major.setter
1105    def major(self, value: int) -> None:
1106        self._data[0:1] = struct.pack("B", value)
1107
1108    @property
1109    def minor(self) -> int:
1110        return struct.unpack("B", self._data[1:2].tobytes())[0]
1111
1112    @minor.setter
1113    def minor(self, value: int) -> None:
1114        self._data[1:2] = struct.pack("B", value)
1115
1116    @property
1117    def build(self) -> int:
1118        return struct.unpack("<H", self._data[2:4].tobytes())[0]
1119
1120    @build.setter
1121    def build(self, value: int) -> None:
1122        self._data[2:4] = struct.pack("<H", value)
1123
1124    @property
1125    def reserved(self) -> bytes:
1126        return self._data[4:7].tobytes()
1127
1128    @property
1129    def revision(self) -> int:
1130        return struct.unpack("B", self._data[7:8].tobytes())[0]
1131
1132    @revision.setter
1133    def revision(self, value: int) -> None:
1134        self._data[7:8] = struct.pack("B", value)
1135
1136    def __repr__(self) -> str:
1137        return "<{0}.{1} {2}.{3}.{4}.{5}>".format(type(self).__module__, type(self).__name__, self.major, self.minor,
1138                                                  self.build, self.revision)
1139
1140    def __str__(self) -> str:
1141        return "%s.%s.%s.%s" % (self.major, self.minor, self.build, self.revision)
1142
1143    def pack(self) -> bytes:
1144        """ Packs the structure to bytes. """
1145        return self._data.tobytes()
1146
1147    @staticmethod
1148    def get_current() -> "Version":
1149        """ Generates an NTLM Version structure based on the pyspnego package version. """
1150        versions = []
1151        for v in pyspnego_version.split('.', 3):
1152            if not v:
1153                continue
1154
1155            match = re.match(r'^(\d+)', v)
1156            if match:
1157                versions.append(int(match.group(1)))
1158
1159        versions += [0] * (3 - len(versions))
1160
1161        return Version(major=int(versions[0]), minor=int(versions[1]), build=int(versions[2]))
1162
1163    @staticmethod
1164    def unpack(b_data: bytes) -> "Version":
1165        """ Creates a Version object from raw bytes. """
1166        return Version(_b_data=b_data)
1167