1import pathlib 2import stat 3import sys 4from logging import getLogger 5from typing import Union 6 7if sys.platform == "win32": 8 import ctypes 9 from ctypes.wintypes import BOOL, DWORD, HANDLE, LPCWSTR, LPDWORD, LPVOID, LPWSTR 10 11 _stdcall_libraries = {} 12 _stdcall_libraries['kernel32'] = ctypes.WinDLL('kernel32') 13 CloseHandle = _stdcall_libraries['kernel32'].CloseHandle 14 CreateFileW = _stdcall_libraries['kernel32'].CreateFileW 15 DeviceIoControl = _stdcall_libraries['kernel32'].DeviceIoControl 16 GetFileAttributesW = _stdcall_libraries['kernel32'].GetFileAttributesW 17 OPEN_EXISTING = 3 18 GENERIC_READ = 2147483648 19 FILE_FLAG_OPEN_REPARSE_POINT = 0x00200000 20 FSCTL_GET_REPARSE_POINT = 0x000900A8 21 FILE_FLAG_BACKUP_SEMANTICS = 0x02000000 22 IO_REPARSE_TAG_MOUNT_POINT = 0xA0000003 23 IO_REPARSE_TAG_SYMLINK = 0xA000000C 24 MAXIMUM_REPARSE_DATA_BUFFER_SIZE = 16 * 1024 25 26 def _check_bit(val: int, flag: int) -> bool: 27 return bool(val & flag == flag) 28 29 class SymbolicLinkReparseBuffer(ctypes.Structure): 30 """ Implementing the below in Python: 31 32 typedef struct _REPARSE_DATA_BUFFER { 33 ULONG ReparseTag; 34 USHORT ReparseDataLength; 35 USHORT Reserved; 36 union { 37 struct { 38 USHORT SubstituteNameOffset; 39 USHORT SubstituteNameLength; 40 USHORT PrintNameOffset; 41 USHORT PrintNameLength; 42 ULONG Flags; 43 WCHAR PathBuffer[1]; 44 } SymbolicLinkReparseBuffer; 45 struct { 46 USHORT SubstituteNameOffset; 47 USHORT SubstituteNameLength; 48 USHORT PrintNameOffset; 49 USHORT PrintNameLength; 50 WCHAR PathBuffer[1]; 51 } MountPointReparseBuffer; 52 struct { 53 UCHAR DataBuffer[1]; 54 } GenericReparseBuffer; 55 } DUMMYUNIONNAME; 56 } REPARSE_DATA_BUFFER, *PREPARSE_DATA_BUFFER; 57 """ 58 # See https://docs.microsoft.com/en-us/windows-hardware/drivers/ddi/content/ntifs/ns-ntifs-_reparse_data_buffer 59 _fields_ = [ 60 ('flags', ctypes.c_ulong), 61 ('path_buffer', ctypes.c_byte * (MAXIMUM_REPARSE_DATA_BUFFER_SIZE - 20)) 62 ] 63 64 class MountReparseBuffer(ctypes.Structure): 65 _fields_ = [ 66 ('path_buffer', ctypes.c_byte * (MAXIMUM_REPARSE_DATA_BUFFER_SIZE - 16)), 67 ] 68 69 class ReparseBufferField(ctypes.Union): 70 _fields_ = [ 71 ('symlink', SymbolicLinkReparseBuffer), 72 ('mount', MountReparseBuffer) 73 ] 74 75 class ReparseBuffer(ctypes.Structure): 76 _anonymous_ = ("u",) 77 _fields_ = [ 78 ('reparse_tag', ctypes.c_ulong), 79 ('reparse_data_length', ctypes.c_ushort), 80 ('reserved', ctypes.c_ushort), 81 ('substitute_name_offset', ctypes.c_ushort), 82 ('substitute_name_length', ctypes.c_ushort), 83 ('print_name_offset', ctypes.c_ushort), 84 ('print_name_length', ctypes.c_ushort), 85 ('u', ReparseBufferField) 86 ] 87 88 def is_reparse_point(path: Union[str, pathlib.Path]) -> bool: 89 GetFileAttributesW.argtypes = [LPCWSTR] 90 GetFileAttributesW.restype = DWORD 91 return _check_bit(GetFileAttributesW(str(path)), stat.FILE_ATTRIBUTE_REPARSE_POINT) 92 93 def readlink(path: Union[str, pathlib.Path]) -> Union[str, pathlib.WindowsPath]: 94 # FILE_FLAG_OPEN_REPARSE_POINT alone is not enough if 'path' 95 # is a symbolic link to a directory or a NTFS junction. 96 # We need to set FILE_FLAG_BACKUP_SEMANTICS as well. 97 # See https://docs.microsoft.com/en-us/windows/desktop/api/fileapi/nf-fileapi-createfilea 98 99 # description from _winapi.c:601 100 # /* REPARSE_DATA_BUFFER usage is heavily under-documented, especially for 101 # junction points. Here's what I've learned along the way: 102 # - A junction point has two components: a print name and a substitute 103 # name. They both describe the link target, but the substitute name is 104 # the physical target and the print name is shown in directory listings. 105 # - The print name must be a native name, prefixed with "\??\". 106 # - Both names are stored after each other in the same buffer (the 107 # PathBuffer) and both must be NUL-terminated. 108 # - There are four members defining their respective offset and length 109 # inside PathBuffer: SubstituteNameOffset, SubstituteNameLength, 110 # PrintNameOffset and PrintNameLength. 111 # - The total size we need to allocate for the REPARSE_DATA_BUFFER, thus, 112 # is the sum of: 113 # - the fixed header size (REPARSE_DATA_BUFFER_HEADER_SIZE) 114 # - the size of the MountPointReparseBuffer member without the PathBuffer 115 # - the size of the prefix ("\??\") in bytes 116 # - the size of the print name in bytes 117 # - the size of the substitute name in bytes 118 # - the size of two NUL terminators in bytes */ 119 120 target_is_path = isinstance(path, pathlib.Path) 121 if target_is_path: 122 target = str(path) 123 else: 124 target = path 125 CreateFileW.argtypes = [LPWSTR, DWORD, DWORD, LPVOID, DWORD, DWORD, HANDLE] 126 CreateFileW.restype = HANDLE 127 DeviceIoControl.argtypes = [HANDLE, DWORD, LPVOID, DWORD, LPVOID, DWORD, LPDWORD, LPVOID] 128 DeviceIoControl.restype = BOOL 129 handle = HANDLE(CreateFileW(target, GENERIC_READ, 0, None, OPEN_EXISTING, 130 FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OPEN_REPARSE_POINT, 0)) 131 buf = ReparseBuffer() 132 ret = DWORD(0) 133 status = DeviceIoControl(handle, FSCTL_GET_REPARSE_POINT, None, 0, ctypes.byref(buf), 134 MAXIMUM_REPARSE_DATA_BUFFER_SIZE, ctypes.byref(ret), None) 135 CloseHandle(handle) 136 if not status: 137 logger = getLogger(__file__) 138 logger.error("Failed IOCTL access to REPARSE_POINT {})".format(target)) 139 raise ValueError("not a symbolic link or access permission violation") 140 141 if buf.reparse_tag == IO_REPARSE_TAG_SYMLINK: 142 offset = buf.substitute_name_offset 143 ending = offset + buf.substitute_name_length 144 rpath = bytearray(buf.symlink.path_buffer)[offset:ending].decode('UTF-16-LE') 145 elif buf.reparse_tag == IO_REPARSE_TAG_MOUNT_POINT: 146 offset = buf.substitute_name_offset 147 ending = offset + buf.substitute_name_length 148 rpath = bytearray(buf.mount.path_buffer)[offset:ending].decode('UTF-16-LE') 149 else: 150 raise ValueError("not a symbolic link") 151 # on posixmodule.c:7859 in py38, we do that 152 # ``` 153 # else if (rdb->ReparseTag == IO_REPARSE_TAG_MOUNT_POINT) 154 # { 155 # name = (wchar_t *)((char*)rdb->MountPointReparseBuffer.PathBuffer + 156 # rdb->MountPointReparseBuffer.SubstituteNameOffset); 157 # nameLen = rdb->MountPointReparseBuffer.SubstituteNameLength / sizeof(wchar_t); 158 # } 159 # else 160 # { 161 # PyErr_SetString(PyExc_ValueError, "not a symbolic link"); 162 # } 163 # if (nameLen > 4 && wcsncmp(name, L"\\??\\", 4) == 0) { 164 # /* Our buffer is mutable, so this is okay */ 165 # name[1] = L'\\'; 166 # } 167 # ``` 168 # so substitute prefix here. 169 if rpath.startswith('\\??\\'): 170 rpath = '\\\\' + rpath[2:] 171 if target_is_path: 172 return pathlib.WindowsPath(rpath) 173 else: 174 return rpath 175