1import pathlib
2import stat
3import sys
4from logging import getLogger
5from typing import Union
6
7if sys.platform == "win32":
8    import ctypes
9    from ctypes.wintypes import BOOL, DWORD, HANDLE, LPCWSTR, LPDWORD, LPVOID, LPWSTR
10
11    _stdcall_libraries = {}
12    _stdcall_libraries['kernel32'] = ctypes.WinDLL('kernel32')
13    CloseHandle = _stdcall_libraries['kernel32'].CloseHandle
14    CreateFileW = _stdcall_libraries['kernel32'].CreateFileW
15    DeviceIoControl = _stdcall_libraries['kernel32'].DeviceIoControl
16    GetFileAttributesW = _stdcall_libraries['kernel32'].GetFileAttributesW
17    OPEN_EXISTING = 3
18    GENERIC_READ = 2147483648
19    FILE_FLAG_OPEN_REPARSE_POINT = 0x00200000
20    FSCTL_GET_REPARSE_POINT = 0x000900A8
21    FILE_FLAG_BACKUP_SEMANTICS = 0x02000000
22    IO_REPARSE_TAG_MOUNT_POINT = 0xA0000003
23    IO_REPARSE_TAG_SYMLINK = 0xA000000C
24    MAXIMUM_REPARSE_DATA_BUFFER_SIZE = 16 * 1024
25
26    def _check_bit(val: int, flag: int) -> bool:
27        return bool(val & flag == flag)
28
29    class SymbolicLinkReparseBuffer(ctypes.Structure):
30        """ Implementing the below in Python:
31
32        typedef struct _REPARSE_DATA_BUFFER {
33            ULONG  ReparseTag;
34            USHORT ReparseDataLength;
35            USHORT Reserved;
36            union {
37                struct {
38                    USHORT SubstituteNameOffset;
39                    USHORT SubstituteNameLength;
40                    USHORT PrintNameOffset;
41                    USHORT PrintNameLength;
42                    ULONG Flags;
43                    WCHAR PathBuffer[1];
44                } SymbolicLinkReparseBuffer;
45                struct {
46                    USHORT SubstituteNameOffset;
47                    USHORT SubstituteNameLength;
48                    USHORT PrintNameOffset;
49                    USHORT PrintNameLength;
50                    WCHAR PathBuffer[1];
51                } MountPointReparseBuffer;
52                struct {
53                    UCHAR  DataBuffer[1];
54                } GenericReparseBuffer;
55            } DUMMYUNIONNAME;
56        } REPARSE_DATA_BUFFER, *PREPARSE_DATA_BUFFER;
57        """
58        # See https://docs.microsoft.com/en-us/windows-hardware/drivers/ddi/content/ntifs/ns-ntifs-_reparse_data_buffer
59        _fields_ = [
60            ('flags', ctypes.c_ulong),
61            ('path_buffer', ctypes.c_byte * (MAXIMUM_REPARSE_DATA_BUFFER_SIZE - 20))
62        ]
63
64    class MountReparseBuffer(ctypes.Structure):
65        _fields_ = [
66            ('path_buffer', ctypes.c_byte * (MAXIMUM_REPARSE_DATA_BUFFER_SIZE - 16)),
67        ]
68
69    class ReparseBufferField(ctypes.Union):
70        _fields_ = [
71            ('symlink', SymbolicLinkReparseBuffer),
72            ('mount', MountReparseBuffer)
73        ]
74
75    class ReparseBuffer(ctypes.Structure):
76        _anonymous_ = ("u",)
77        _fields_ = [
78            ('reparse_tag', ctypes.c_ulong),
79            ('reparse_data_length', ctypes.c_ushort),
80            ('reserved', ctypes.c_ushort),
81            ('substitute_name_offset', ctypes.c_ushort),
82            ('substitute_name_length', ctypes.c_ushort),
83            ('print_name_offset', ctypes.c_ushort),
84            ('print_name_length', ctypes.c_ushort),
85            ('u', ReparseBufferField)
86        ]
87
88    def is_reparse_point(path: Union[str, pathlib.Path]) -> bool:
89        GetFileAttributesW.argtypes = [LPCWSTR]
90        GetFileAttributesW.restype = DWORD
91        return _check_bit(GetFileAttributesW(str(path)), stat.FILE_ATTRIBUTE_REPARSE_POINT)
92
93    def readlink(path: Union[str, pathlib.Path]) -> Union[str, pathlib.WindowsPath]:
94        # FILE_FLAG_OPEN_REPARSE_POINT alone is not enough if 'path'
95        # is a symbolic link to a directory or a NTFS junction.
96        # We need to set FILE_FLAG_BACKUP_SEMANTICS as well.
97        # See https://docs.microsoft.com/en-us/windows/desktop/api/fileapi/nf-fileapi-createfilea
98
99        # description from _winapi.c:601
100        # /* REPARSE_DATA_BUFFER usage is heavily under-documented, especially for
101        #  junction points. Here's what I've learned along the way:
102        #  - A junction point has two components: a print name and a substitute
103        #  name. They both describe the link target, but the substitute name is
104        #  the physical target and the print name is shown in directory listings.
105        #  - The print name must be a native name, prefixed with "\??\".
106        #  - Both names are stored after each other in the same buffer (the
107        #  PathBuffer) and both must be NUL-terminated.
108        #  - There are four members defining their respective offset and length
109        #  inside PathBuffer: SubstituteNameOffset, SubstituteNameLength,
110        #  PrintNameOffset and PrintNameLength.
111        #  - The total size we need to allocate for the REPARSE_DATA_BUFFER, thus,
112        #  is the sum of:
113        #  - the fixed header size (REPARSE_DATA_BUFFER_HEADER_SIZE)
114        #  - the size of the MountPointReparseBuffer member without the PathBuffer
115        #  - the size of the prefix ("\??\") in bytes
116        #  - the size of the print name in bytes
117        #  - the size of the substitute name in bytes
118        #  - the size of two NUL terminators in bytes */
119
120        target_is_path = isinstance(path, pathlib.Path)
121        if target_is_path:
122            target = str(path)
123        else:
124            target = path
125        CreateFileW.argtypes = [LPWSTR, DWORD, DWORD, LPVOID, DWORD, DWORD, HANDLE]
126        CreateFileW.restype = HANDLE
127        DeviceIoControl.argtypes = [HANDLE, DWORD, LPVOID, DWORD, LPVOID, DWORD, LPDWORD, LPVOID]
128        DeviceIoControl.restype = BOOL
129        handle = HANDLE(CreateFileW(target, GENERIC_READ, 0, None, OPEN_EXISTING,
130                                    FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OPEN_REPARSE_POINT, 0))
131        buf = ReparseBuffer()
132        ret = DWORD(0)
133        status = DeviceIoControl(handle, FSCTL_GET_REPARSE_POINT, None, 0, ctypes.byref(buf),
134                                 MAXIMUM_REPARSE_DATA_BUFFER_SIZE, ctypes.byref(ret), None)
135        CloseHandle(handle)
136        if not status:
137            logger = getLogger(__file__)
138            logger.error("Failed IOCTL access to REPARSE_POINT {})".format(target))
139            raise ValueError("not a symbolic link or access permission violation")
140
141        if buf.reparse_tag == IO_REPARSE_TAG_SYMLINK:
142            offset = buf.substitute_name_offset
143            ending = offset + buf.substitute_name_length
144            rpath = bytearray(buf.symlink.path_buffer)[offset:ending].decode('UTF-16-LE')
145        elif buf.reparse_tag == IO_REPARSE_TAG_MOUNT_POINT:
146            offset = buf.substitute_name_offset
147            ending = offset + buf.substitute_name_length
148            rpath = bytearray(buf.mount.path_buffer)[offset:ending].decode('UTF-16-LE')
149        else:
150            raise ValueError("not a symbolic link")
151        # on posixmodule.c:7859 in py38, we do that
152        # ```
153        # else if (rdb->ReparseTag == IO_REPARSE_TAG_MOUNT_POINT)
154        # {
155        #    name = (wchar_t *)((char*)rdb->MountPointReparseBuffer.PathBuffer +
156        #                       rdb->MountPointReparseBuffer.SubstituteNameOffset);
157        #    nameLen = rdb->MountPointReparseBuffer.SubstituteNameLength / sizeof(wchar_t);
158        # }
159        # else
160        # {
161        #    PyErr_SetString(PyExc_ValueError, "not a symbolic link");
162        # }
163        # if (nameLen > 4 && wcsncmp(name, L"\\??\\", 4) == 0) {
164        #             /* Our buffer is mutable, so this is okay */
165        #             name[1] = L'\\';
166        #         }
167        # ```
168        # so substitute prefix here.
169        if rpath.startswith('\\??\\'):
170            rpath = '\\\\' + rpath[2:]
171        if target_is_path:
172            return pathlib.WindowsPath(rpath)
173        else:
174            return rpath
175