1"""Path utility functions.""" 2 3# Copyright (c) Jupyter Development Team. 4# Distributed under the terms of the Modified BSD License. 5 6# Derived from IPython.utils.path, which is 7# Copyright (c) IPython Development Team. 8# Distributed under the terms of the Modified BSD License. 9 10 11import os 12import sys 13import stat 14import errno 15import site 16import tempfile 17import warnings 18from pathlib import Path 19 20from contextlib import contextmanager 21 22pjoin = os.path.join 23 24# UF_HIDDEN is a stat flag not defined in the stat module. 25# It is used by BSD to indicate hidden files. 26UF_HIDDEN = getattr(stat, 'UF_HIDDEN', 32768) 27 28 29def envset(name): 30 """Return True if the given environment variable is set 31 32 An environment variable is considered set if it is assigned to a value 33 other than 'no', 'n', 'false', 'off', '0', or '0.0' (case insensitive) 34 """ 35 return os.environ.get(name, 'no').lower() not in ['no', 'n', 'false', 'off', '0', '0.0'] 36 37def get_home_dir(): 38 """Get the real path of the home directory""" 39 homedir = os.path.expanduser('~') 40 # Next line will make things work even when /home/ is a symlink to 41 # /usr/home as it is on FreeBSD, for example 42 homedir = str(Path(homedir).resolve()) 43 return homedir 44 45_dtemps = {} 46def _mkdtemp_once(name): 47 """Make or reuse a temporary directory. 48 49 If this is called with the same name in the same process, it will return 50 the same directory. 51 """ 52 try: 53 return _dtemps[name] 54 except KeyError: 55 d = _dtemps[name] = tempfile.mkdtemp(prefix=name + '-') 56 return d 57 58def jupyter_config_dir(): 59 """Get the Jupyter config directory for this platform and user. 60 61 Returns JUPYTER_CONFIG_DIR if defined, else ~/.jupyter 62 """ 63 64 env = os.environ 65 home_dir = get_home_dir() 66 67 if env.get('JUPYTER_NO_CONFIG'): 68 return _mkdtemp_once('jupyter-clean-cfg') 69 70 if env.get('JUPYTER_CONFIG_DIR'): 71 return env['JUPYTER_CONFIG_DIR'] 72 73 return pjoin(home_dir, '.jupyter') 74 75 76def jupyter_data_dir(): 77 """Get the config directory for Jupyter data files for this platform and user. 78 79 These are non-transient, non-configuration files. 80 81 Returns JUPYTER_DATA_DIR if defined, else a platform-appropriate path. 82 """ 83 env = os.environ 84 85 if env.get('JUPYTER_DATA_DIR'): 86 return env['JUPYTER_DATA_DIR'] 87 88 home = get_home_dir() 89 90 if sys.platform == 'darwin': 91 return os.path.join(home, 'Library', 'Jupyter') 92 elif os.name == 'nt': 93 appdata = os.environ.get('APPDATA', None) 94 if appdata: 95 return str(Path(appdata, 'jupyter').resolve()) 96 else: 97 return pjoin(jupyter_config_dir(), 'data') 98 else: 99 # Linux, non-OS X Unix, AIX, etc. 100 xdg = env.get("XDG_DATA_HOME", None) 101 if not xdg: 102 xdg = pjoin(home, '.local', 'share') 103 return pjoin(xdg, 'jupyter') 104 105 106def jupyter_runtime_dir(): 107 """Return the runtime dir for transient jupyter files. 108 109 Returns JUPYTER_RUNTIME_DIR if defined. 110 111 The default is now (data_dir)/runtime on all platforms; 112 we no longer use XDG_RUNTIME_DIR after various problems. 113 """ 114 env = os.environ 115 116 if env.get('JUPYTER_RUNTIME_DIR'): 117 return env['JUPYTER_RUNTIME_DIR'] 118 119 return pjoin(jupyter_data_dir(), 'runtime') 120 121 122if os.name == 'nt': 123 programdata = os.environ.get('PROGRAMDATA', None) 124 if programdata: 125 SYSTEM_JUPYTER_PATH = [pjoin(programdata, 'jupyter')] 126 else: # PROGRAMDATA is not defined by default on XP. 127 SYSTEM_JUPYTER_PATH = [os.path.join(sys.prefix, 'share', 'jupyter')] 128else: 129 SYSTEM_JUPYTER_PATH = [ 130 "/usr/local/share/jupyter", 131 "/usr/share/jupyter", 132 ] 133 134ENV_JUPYTER_PATH = [os.path.join(sys.prefix, 'share', 'jupyter')] 135 136 137def jupyter_path(*subdirs): 138 """Return a list of directories to search for data files 139 140 JUPYTER_PATH environment variable has highest priority. 141 142 If the JUPYTER_PREFER_ENV_PATH environment variable is set, the environment-level 143 directories will have priority over user-level directories. 144 145 If the Python site.ENABLE_USER_SITE variable is True, we also add the 146 appropriate Python user site subdirectory to the user-level directories. 147 148 149 If ``*subdirs`` are given, that subdirectory will be added to each element. 150 151 Examples: 152 153 >>> jupyter_path() 154 ['~/.local/jupyter', '/usr/local/share/jupyter'] 155 >>> jupyter_path('kernels') 156 ['~/.local/jupyter/kernels', '/usr/local/share/jupyter/kernels'] 157 """ 158 159 paths = [] 160 161 # highest priority is explicit environment variable 162 if os.environ.get('JUPYTER_PATH'): 163 paths.extend( 164 p.rstrip(os.sep) 165 for p in os.environ['JUPYTER_PATH'].split(os.pathsep) 166 ) 167 168 # Next is environment or user, depending on the JUPYTER_PREFER_ENV_PATH flag 169 user = [jupyter_data_dir()] 170 if site.ENABLE_USER_SITE: 171 # Check if site.getuserbase() exists to be compatible with virtualenv, 172 # which often does not have this method. 173 if hasattr(site, 'getuserbase'): 174 userbase = site.getuserbase() 175 else: 176 userbase = site.USER_BASE 177 userdir = os.path.join(userbase, 'share', 'jupyter') 178 if userdir not in user: 179 user.append(userdir) 180 181 env = [p for p in ENV_JUPYTER_PATH if p not in SYSTEM_JUPYTER_PATH] 182 183 if envset('JUPYTER_PREFER_ENV_PATH'): 184 paths.extend(env) 185 paths.extend(user) 186 else: 187 paths.extend(user) 188 paths.extend(env) 189 190 # finally, system 191 paths.extend(SYSTEM_JUPYTER_PATH) 192 193 # add subdir, if requested 194 if subdirs: 195 paths = [ pjoin(p, *subdirs) for p in paths ] 196 return paths 197 198 199if os.name == 'nt': 200 programdata = os.environ.get('PROGRAMDATA', None) 201 if programdata: 202 SYSTEM_CONFIG_PATH = [os.path.join(programdata, 'jupyter')] 203 else: # PROGRAMDATA is not defined by default on XP. 204 SYSTEM_CONFIG_PATH = [] 205else: 206 SYSTEM_CONFIG_PATH = [ 207 "/usr/local/etc/jupyter", 208 "/etc/jupyter", 209 ] 210 211ENV_CONFIG_PATH = [os.path.join(sys.prefix, 'etc', 'jupyter')] 212 213 214def jupyter_config_path(): 215 """Return the search path for Jupyter config files as a list. 216 217 If the JUPYTER_PREFER_ENV_PATH environment variable is set, the 218 environment-level directories will have priority over user-level 219 directories. 220 221 If the Python site.ENABLE_USER_SITE variable is True, we also add the 222 appropriate Python user site subdirectory to the user-level directories. 223 """ 224 if os.environ.get('JUPYTER_NO_CONFIG'): 225 # jupyter_config_dir makes a blank config when JUPYTER_NO_CONFIG is set. 226 return [jupyter_config_dir()] 227 228 paths = [] 229 230 # highest priority is explicit environment variable 231 if os.environ.get('JUPYTER_CONFIG_PATH'): 232 paths.extend( 233 p.rstrip(os.sep) 234 for p in os.environ['JUPYTER_CONFIG_PATH'].split(os.pathsep) 235 ) 236 237 # Next is environment or user, depending on the JUPYTER_PREFER_ENV_PATH flag 238 user = [jupyter_config_dir()] 239 if site.ENABLE_USER_SITE: 240 # Check if site.getuserbase() exists to be compatible with virtualenv, 241 # which often does not have this method. 242 if hasattr(site, 'getuserbase'): 243 userbase = site.getuserbase() 244 else: 245 userbase = site.USER_BASE 246 247 userdir = os.path.join(userbase, 'etc', 'jupyter') 248 if userdir not in user: 249 user.append(userdir) 250 251 env = [p for p in ENV_CONFIG_PATH if p not in SYSTEM_CONFIG_PATH] 252 253 if envset('JUPYTER_PREFER_ENV_PATH'): 254 paths.extend(env) 255 paths.extend(user) 256 else: 257 paths.extend(user) 258 paths.extend(env) 259 260 # Finally, system path 261 paths.extend(SYSTEM_CONFIG_PATH) 262 return paths 263 264 265def exists(path): 266 """Replacement for `os.path.exists` which works for host mapped volumes 267 on Windows containers 268 """ 269 try: 270 os.lstat(path) 271 except OSError: 272 return False 273 return True 274 275 276def is_file_hidden_win(abs_path, stat_res=None): 277 """Is a file hidden? 278 279 This only checks the file itself; it should be called in combination with 280 checking the directory containing the file. 281 282 Use is_hidden() instead to check the file and its parent directories. 283 284 Parameters 285 ---------- 286 abs_path : unicode 287 The absolute path to check. 288 stat_res : os.stat_result, optional 289 The result of calling stat() on abs_path. If not passed, this function 290 will call stat() internally. 291 """ 292 if os.path.basename(abs_path).startswith('.'): 293 return True 294 295 if stat_res is None: 296 try: 297 stat_res = os.stat(abs_path) 298 except OSError as e: 299 if e.errno == errno.ENOENT: 300 return False 301 raise 302 303 try: 304 if stat_res.st_file_attributes & stat.FILE_ATTRIBUTE_HIDDEN: 305 return True 306 except AttributeError: 307 # allow AttributeError on PyPy for Windows 308 # 'stat_result' object has no attribute 'st_file_attributes' 309 # https://foss.heptapod.net/pypy/pypy/-/issues/3469 310 warnings.warn("hidden files are not detectable on this system, so no file will be marked as hidden.") 311 pass 312 313 return False 314 315 316def is_file_hidden_posix(abs_path, stat_res=None): 317 """Is a file hidden? 318 319 This only checks the file itself; it should be called in combination with 320 checking the directory containing the file. 321 322 Use is_hidden() instead to check the file and its parent directories. 323 324 Parameters 325 ---------- 326 abs_path : unicode 327 The absolute path to check. 328 stat_res : os.stat_result, optional 329 The result of calling stat() on abs_path. If not passed, this function 330 will call stat() internally. 331 """ 332 if os.path.basename(abs_path).startswith('.'): 333 return True 334 335 if stat_res is None or stat.S_ISLNK(stat_res.st_mode): 336 try: 337 stat_res = os.stat(abs_path) 338 except OSError as e: 339 if e.errno == errno.ENOENT: 340 return False 341 raise 342 343 # check that dirs can be listed 344 if stat.S_ISDIR(stat_res.st_mode): 345 # use x-access, not actual listing, in case of slow/large listings 346 if not os.access(abs_path, os.X_OK | os.R_OK): 347 return True 348 349 # check UF_HIDDEN 350 if getattr(stat_res, 'st_flags', 0) & UF_HIDDEN: 351 return True 352 353 return False 354 355 356if sys.platform == 'win32': 357 is_file_hidden = is_file_hidden_win 358else: 359 is_file_hidden = is_file_hidden_posix 360 361 362def is_hidden(abs_path, abs_root=''): 363 """Is a file hidden or contained in a hidden directory? 364 365 This will start with the rightmost path element and work backwards to the 366 given root to see if a path is hidden or in a hidden directory. Hidden is 367 determined by either name starting with '.' or the UF_HIDDEN flag as 368 reported by stat. 369 370 If abs_path is the same directory as abs_root, it will be visible even if 371 that is a hidden folder. This only checks the visibility of files 372 and directories *within* abs_root. 373 374 Parameters 375 ---------- 376 abs_path : unicode 377 The absolute path to check for hidden directories. 378 abs_root : unicode 379 The absolute path of the root directory in which hidden directories 380 should be checked for. 381 """ 382 if os.path.normpath(abs_path) == os.path.normpath(abs_root): 383 return False 384 385 if is_file_hidden(abs_path): 386 return True 387 388 if not abs_root: 389 abs_root = abs_path.split(os.sep, 1)[0] + os.sep 390 inside_root = abs_path[len(abs_root):] 391 if any(part.startswith('.') for part in inside_root.split(os.sep)): 392 return True 393 394 # check UF_HIDDEN on any location up to root. 395 # is_file_hidden() already checked the file, so start from its parent dir 396 path = os.path.dirname(abs_path) 397 while path and path.startswith(abs_root) and path != abs_root: 398 if not exists(path): 399 path = os.path.dirname(path) 400 continue 401 try: 402 # may fail on Windows junctions 403 st = os.lstat(path) 404 except OSError: 405 return True 406 if getattr(st, 'st_flags', 0) & UF_HIDDEN: 407 return True 408 path = os.path.dirname(path) 409 410 return False 411 412 413def win32_restrict_file_to_user(fname): 414 """Secure a windows file to read-only access for the user. 415 Follows guidance from win32 library creator: 416 http://timgolden.me.uk/python/win32_how_do_i/add-security-to-a-file.html 417 418 This method should be executed against an already generated file which 419 has no secrets written to it yet. 420 421 Parameters 422 ---------- 423 424 fname : unicode 425 The path to the file to secure 426 """ 427 try: 428 import win32api 429 except ImportError: 430 return _win32_restrict_file_to_user_ctypes(fname) 431 432 import win32security 433 import ntsecuritycon as con 434 435 # everyone, _domain, _type = win32security.LookupAccountName("", "Everyone") 436 admins = win32security.CreateWellKnownSid(win32security.WinBuiltinAdministratorsSid) 437 user, _domain, _type = win32security.LookupAccountName("", win32api.GetUserNameEx(win32api.NameSamCompatible)) 438 439 sd = win32security.GetFileSecurity(fname, win32security.DACL_SECURITY_INFORMATION) 440 441 dacl = win32security.ACL() 442 # dacl.AddAccessAllowedAce(win32security.ACL_REVISION, con.FILE_ALL_ACCESS, everyone) 443 dacl.AddAccessAllowedAce(win32security.ACL_REVISION, con.FILE_GENERIC_READ | con.FILE_GENERIC_WRITE | con.DELETE, user) 444 dacl.AddAccessAllowedAce(win32security.ACL_REVISION, con.FILE_ALL_ACCESS, admins) 445 446 sd.SetSecurityDescriptorDacl(1, dacl, 0) 447 win32security.SetFileSecurity(fname, win32security.DACL_SECURITY_INFORMATION, sd) 448 449 450def _win32_restrict_file_to_user_ctypes(fname): 451 """Secure a windows file to read-only access for the user. 452 453 Follows guidance from win32 library creator: 454 http://timgolden.me.uk/python/win32_how_do_i/add-security-to-a-file.html 455 456 This method should be executed against an already generated file which 457 has no secrets written to it yet. 458 459 Parameters 460 ---------- 461 462 fname : unicode 463 The path to the file to secure 464 """ 465 import ctypes 466 from ctypes import wintypes 467 468 advapi32 = ctypes.WinDLL('advapi32', use_last_error=True) 469 secur32 = ctypes.WinDLL('secur32', use_last_error=True) 470 471 NameSamCompatible = 2 472 WinBuiltinAdministratorsSid = 26 473 DACL_SECURITY_INFORMATION = 4 474 ACL_REVISION = 2 475 ERROR_INSUFFICIENT_BUFFER = 122 476 ERROR_MORE_DATA = 234 477 478 SYNCHRONIZE = 0x100000 479 DELETE = 0x00010000 480 STANDARD_RIGHTS_REQUIRED = 0xF0000 481 STANDARD_RIGHTS_READ = 0x20000 482 STANDARD_RIGHTS_WRITE = 0x20000 483 FILE_READ_DATA = 1 484 FILE_READ_EA = 8 485 FILE_READ_ATTRIBUTES = 128 486 FILE_WRITE_DATA = 2 487 FILE_APPEND_DATA = 4 488 FILE_WRITE_EA = 16 489 FILE_WRITE_ATTRIBUTES = 256 490 FILE_ALL_ACCESS = STANDARD_RIGHTS_REQUIRED | SYNCHRONIZE | 0x1FF 491 FILE_GENERIC_READ = ( 492 STANDARD_RIGHTS_READ 493 | FILE_READ_DATA 494 | FILE_READ_ATTRIBUTES 495 | FILE_READ_EA 496 | SYNCHRONIZE 497 ) 498 FILE_GENERIC_WRITE = ( 499 STANDARD_RIGHTS_WRITE 500 | FILE_WRITE_DATA 501 | FILE_WRITE_ATTRIBUTES 502 | FILE_WRITE_EA 503 | FILE_APPEND_DATA 504 | SYNCHRONIZE 505 ) 506 507 class ACL(ctypes.Structure): 508 _fields_ = [ 509 ('AclRevision', wintypes.BYTE), 510 ('Sbz1', wintypes.BYTE), 511 ('AclSize', wintypes.WORD), 512 ('AceCount', wintypes.WORD), 513 ('Sbz2', wintypes.WORD), 514 ] 515 516 PSID = ctypes.c_void_p 517 PACL = ctypes.POINTER(ACL) 518 PSECURITY_DESCRIPTOR = ctypes.POINTER(wintypes.BYTE) 519 520 def _nonzero_success(result, func, args): 521 if not result: 522 raise ctypes.WinError(ctypes.get_last_error()) 523 return args 524 525 secur32.GetUserNameExW.errcheck = _nonzero_success 526 secur32.GetUserNameExW.restype = wintypes.BOOL 527 secur32.GetUserNameExW.argtypes = ( 528 ctypes.c_int, # EXTENDED_NAME_FORMAT NameFormat 529 wintypes.LPWSTR, # LPWSTR lpNameBuffer, 530 wintypes.PULONG, # PULONG nSize 531 ) 532 533 advapi32.CreateWellKnownSid.errcheck = _nonzero_success 534 advapi32.CreateWellKnownSid.restype = wintypes.BOOL 535 advapi32.CreateWellKnownSid.argtypes = ( 536 wintypes.DWORD, # WELL_KNOWN_SID_TYPE WellKnownSidType 537 PSID, # PSID DomainSid 538 PSID, # PSID pSid 539 wintypes.PDWORD, # DWORD *cbSid 540 ) 541 542 advapi32.LookupAccountNameW.errcheck = _nonzero_success 543 advapi32.LookupAccountNameW.restype = wintypes.BOOL 544 advapi32.LookupAccountNameW.argtypes = ( 545 wintypes.LPWSTR, # LPCWSTR lpSystemName 546 wintypes.LPWSTR, # LPCWSTR lpAccountName 547 PSID, # PSID Sid 548 wintypes.LPDWORD, # LPDWORD cbSid 549 wintypes.LPWSTR, # LPCWSTR ReferencedDomainName 550 wintypes.LPDWORD, # LPDWORD cchReferencedDomainName 551 wintypes.LPDWORD, # PSID_NAME_USE peUse 552 ) 553 554 advapi32.AddAccessAllowedAce.errcheck = _nonzero_success 555 advapi32.AddAccessAllowedAce.restype = wintypes.BOOL 556 advapi32.AddAccessAllowedAce.argtypes = ( 557 PACL, # PACL pAcl 558 wintypes.DWORD, # DWORD dwAceRevision 559 wintypes.DWORD, # DWORD AccessMask 560 PSID, # PSID pSid 561 ) 562 563 advapi32.SetSecurityDescriptorDacl.errcheck = _nonzero_success 564 advapi32.SetSecurityDescriptorDacl.restype = wintypes.BOOL 565 advapi32.SetSecurityDescriptorDacl.argtypes = ( 566 PSECURITY_DESCRIPTOR, # PSECURITY_DESCRIPTOR pSecurityDescriptor 567 wintypes.BOOL, # BOOL bDaclPresent 568 PACL, # PACL pDacl 569 wintypes.BOOL, # BOOL bDaclDefaulted 570 ) 571 572 advapi32.GetFileSecurityW.errcheck = _nonzero_success 573 advapi32.GetFileSecurityW.restype = wintypes.BOOL 574 advapi32.GetFileSecurityW.argtypes = ( 575 wintypes.LPCWSTR, # LPCWSTR lpFileName 576 wintypes.DWORD, # SECURITY_INFORMATION RequestedInformation 577 PSECURITY_DESCRIPTOR, # PSECURITY_DESCRIPTOR pSecurityDescriptor 578 wintypes.DWORD, # DWORD nLength 579 wintypes.LPDWORD, # LPDWORD lpnLengthNeeded 580 ) 581 582 advapi32.SetFileSecurityW.errcheck = _nonzero_success 583 advapi32.SetFileSecurityW.restype = wintypes.BOOL 584 advapi32.SetFileSecurityW.argtypes = ( 585 wintypes.LPCWSTR, # LPCWSTR lpFileName 586 wintypes.DWORD, # SECURITY_INFORMATION SecurityInformation 587 PSECURITY_DESCRIPTOR, # PSECURITY_DESCRIPTOR pSecurityDescriptor 588 ) 589 590 advapi32.MakeAbsoluteSD.errcheck = _nonzero_success 591 advapi32.MakeAbsoluteSD.restype = wintypes.BOOL 592 advapi32.MakeAbsoluteSD.argtypes = ( 593 PSECURITY_DESCRIPTOR, # pSelfRelativeSecurityDescriptor 594 PSECURITY_DESCRIPTOR, # pAbsoluteSecurityDescriptor 595 wintypes.LPDWORD, # LPDWORD lpdwAbsoluteSecurityDescriptorSize 596 PACL, # PACL pDacl 597 wintypes.LPDWORD, # LPDWORD lpdwDaclSize 598 PACL, # PACL pSacl 599 wintypes.LPDWORD, # LPDWORD lpdwSaclSize 600 PSID, # PSID pOwner 601 wintypes.LPDWORD, # LPDWORD lpdwOwnerSize 602 PSID, # PSID pPrimaryGroup 603 wintypes.LPDWORD, # LPDWORD lpdwPrimaryGroupSize 604 ) 605 606 advapi32.MakeSelfRelativeSD.errcheck = _nonzero_success 607 advapi32.MakeSelfRelativeSD.restype = wintypes.BOOL 608 advapi32.MakeSelfRelativeSD.argtypes = ( 609 PSECURITY_DESCRIPTOR, # pAbsoluteSecurityDescriptor 610 PSECURITY_DESCRIPTOR, # pSelfRelativeSecurityDescriptor 611 wintypes.LPDWORD, # LPDWORD lpdwBufferLength 612 ) 613 614 advapi32.InitializeAcl.errcheck = _nonzero_success 615 advapi32.InitializeAcl.restype = wintypes.BOOL 616 advapi32.InitializeAcl.argtypes = ( 617 PACL, # PACL pAcl, 618 wintypes.DWORD, # DWORD nAclLength, 619 wintypes.DWORD, # DWORD dwAclRevision 620 ) 621 622 def CreateWellKnownSid(WellKnownSidType): 623 # return a SID for predefined aliases 624 pSid = (ctypes.c_char * 1)() 625 cbSid = wintypes.DWORD() 626 try: 627 advapi32.CreateWellKnownSid( 628 WellKnownSidType, None, pSid, ctypes.byref(cbSid) 629 ) 630 except OSError as e: 631 if e.winerror != ERROR_INSUFFICIENT_BUFFER: 632 raise 633 pSid = (ctypes.c_char * cbSid.value)() 634 advapi32.CreateWellKnownSid( 635 WellKnownSidType, None, pSid, ctypes.byref(cbSid) 636 ) 637 return pSid[:] 638 639 def GetUserNameEx(NameFormat): 640 # return the user or other security principal associated with 641 # the calling thread 642 nSize = ctypes.pointer(ctypes.c_ulong(0)) 643 try: 644 secur32.GetUserNameExW(NameFormat, None, nSize) 645 except WindowsError as e: 646 if e.winerror != ERROR_MORE_DATA: 647 raise 648 if not nSize.contents.value: 649 return None 650 lpNameBuffer = ctypes.create_unicode_buffer(nSize.contents.value) 651 secur32.GetUserNameExW(NameFormat, lpNameBuffer, nSize) 652 return lpNameBuffer.value 653 654 def LookupAccountName(lpSystemName, lpAccountName): 655 # return a security identifier (SID) for an account on a system 656 # and the name of the domain on which the account was found 657 cbSid = wintypes.DWORD(0) 658 cchReferencedDomainName = wintypes.DWORD(0) 659 peUse = wintypes.DWORD(0) 660 try: 661 advapi32.LookupAccountNameW( 662 lpSystemName, 663 lpAccountName, 664 None, 665 ctypes.byref(cbSid), 666 None, 667 ctypes.byref(cchReferencedDomainName), 668 ctypes.byref(peUse), 669 ) 670 except WindowsError as e: 671 if e.winerror != ERROR_INSUFFICIENT_BUFFER: 672 raise 673 Sid = ctypes.create_unicode_buffer('', cbSid.value) 674 pSid = ctypes.cast(ctypes.pointer(Sid), wintypes.LPVOID) 675 lpReferencedDomainName = ctypes.create_unicode_buffer( 676 '', cchReferencedDomainName.value + 1 677 ) 678 success = advapi32.LookupAccountNameW( 679 lpSystemName, 680 lpAccountName, 681 pSid, 682 ctypes.byref(cbSid), 683 lpReferencedDomainName, 684 ctypes.byref(cchReferencedDomainName), 685 ctypes.byref(peUse), 686 ) 687 if not success: 688 raise ctypes.WinError() 689 return pSid, lpReferencedDomainName.value, peUse.value 690 691 def AddAccessAllowedAce(pAcl, dwAceRevision, AccessMask, pSid): 692 # add an access-allowed access control entry (ACE) 693 # to an access control list (ACL) 694 advapi32.AddAccessAllowedAce(pAcl, dwAceRevision, AccessMask, pSid) 695 696 def GetFileSecurity(lpFileName, RequestedInformation): 697 # return information about the security of a file or directory 698 nLength = wintypes.DWORD(0) 699 try: 700 advapi32.GetFileSecurityW( 701 lpFileName, 702 RequestedInformation, 703 None, 704 0, 705 ctypes.byref(nLength), 706 ) 707 except WindowsError as e: 708 if e.winerror != ERROR_INSUFFICIENT_BUFFER: 709 raise 710 if not nLength.value: 711 return None 712 pSecurityDescriptor = (wintypes.BYTE * nLength.value)() 713 advapi32.GetFileSecurityW( 714 lpFileName, 715 RequestedInformation, 716 pSecurityDescriptor, 717 nLength, 718 ctypes.byref(nLength), 719 ) 720 return pSecurityDescriptor 721 722 def SetFileSecurity(lpFileName, RequestedInformation, pSecurityDescriptor): 723 # set the security of a file or directory object 724 advapi32.SetFileSecurityW( 725 lpFileName, RequestedInformation, pSecurityDescriptor 726 ) 727 728 def SetSecurityDescriptorDacl( 729 pSecurityDescriptor, bDaclPresent, pDacl, bDaclDefaulted 730 ): 731 # set information in a discretionary access control list (DACL) 732 advapi32.SetSecurityDescriptorDacl( 733 pSecurityDescriptor, bDaclPresent, pDacl, bDaclDefaulted 734 ) 735 736 def MakeAbsoluteSD(pSelfRelativeSecurityDescriptor): 737 # return a security descriptor in absolute format 738 # by using a security descriptor in self-relative format as a template 739 pAbsoluteSecurityDescriptor = None 740 lpdwAbsoluteSecurityDescriptorSize = wintypes.DWORD(0) 741 pDacl = None 742 lpdwDaclSize = wintypes.DWORD(0) 743 pSacl = None 744 lpdwSaclSize = wintypes.DWORD(0) 745 pOwner = None 746 lpdwOwnerSize = wintypes.DWORD(0) 747 pPrimaryGroup = None 748 lpdwPrimaryGroupSize = wintypes.DWORD(0) 749 try: 750 advapi32.MakeAbsoluteSD( 751 pSelfRelativeSecurityDescriptor, 752 pAbsoluteSecurityDescriptor, 753 ctypes.byref(lpdwAbsoluteSecurityDescriptorSize), 754 pDacl, 755 ctypes.byref(lpdwDaclSize), 756 pSacl, 757 ctypes.byref(lpdwSaclSize), 758 pOwner, 759 ctypes.byref(lpdwOwnerSize), 760 pPrimaryGroup, 761 ctypes.byref(lpdwPrimaryGroupSize), 762 ) 763 except WindowsError as e: 764 if e.winerror != ERROR_INSUFFICIENT_BUFFER: 765 raise 766 pAbsoluteSecurityDescriptor = ( 767 wintypes.BYTE * lpdwAbsoluteSecurityDescriptorSize.value 768 )() 769 pDaclData = (wintypes.BYTE * lpdwDaclSize.value)() 770 pDacl = ctypes.cast(pDaclData, PACL).contents 771 pSaclData = (wintypes.BYTE * lpdwSaclSize.value)() 772 pSacl = ctypes.cast(pSaclData, PACL).contents 773 pOwnerData = (wintypes.BYTE * lpdwOwnerSize.value)() 774 pOwner = ctypes.cast(pOwnerData, PSID) 775 pPrimaryGroupData = (wintypes.BYTE * lpdwPrimaryGroupSize.value)() 776 pPrimaryGroup = ctypes.cast(pPrimaryGroupData, PSID) 777 advapi32.MakeAbsoluteSD( 778 pSelfRelativeSecurityDescriptor, 779 pAbsoluteSecurityDescriptor, 780 ctypes.byref(lpdwAbsoluteSecurityDescriptorSize), 781 pDacl, 782 ctypes.byref(lpdwDaclSize), 783 pSacl, 784 ctypes.byref(lpdwSaclSize), 785 pOwner, 786 lpdwOwnerSize, 787 pPrimaryGroup, 788 ctypes.byref(lpdwPrimaryGroupSize), 789 ) 790 return pAbsoluteSecurityDescriptor 791 792 def MakeSelfRelativeSD(pAbsoluteSecurityDescriptor): 793 # return a security descriptor in self-relative format 794 # by using a security descriptor in absolute format as a template 795 pSelfRelativeSecurityDescriptor = None 796 lpdwBufferLength = wintypes.DWORD(0) 797 try: 798 advapi32.MakeSelfRelativeSD( 799 pAbsoluteSecurityDescriptor, 800 pSelfRelativeSecurityDescriptor, 801 ctypes.byref(lpdwBufferLength), 802 ) 803 except WindowsError as e: 804 if e.winerror != ERROR_INSUFFICIENT_BUFFER: 805 raise 806 pSelfRelativeSecurityDescriptor = ( 807 wintypes.BYTE * lpdwBufferLength.value 808 )() 809 advapi32.MakeSelfRelativeSD( 810 pAbsoluteSecurityDescriptor, 811 pSelfRelativeSecurityDescriptor, 812 ctypes.byref(lpdwBufferLength), 813 ) 814 return pSelfRelativeSecurityDescriptor 815 816 def NewAcl(): 817 # return a new, initialized ACL (access control list) structure 818 nAclLength = 32767 # TODO: calculate this: ctypes.sizeof(ACL) + ? 819 acl_data = ctypes.create_string_buffer(nAclLength) 820 pAcl = ctypes.cast(acl_data, PACL).contents 821 advapi32.InitializeAcl(pAcl, nAclLength, ACL_REVISION) 822 return pAcl 823 824 SidAdmins = CreateWellKnownSid(WinBuiltinAdministratorsSid) 825 SidUser = LookupAccountName('', GetUserNameEx(NameSamCompatible))[0] 826 827 Acl = NewAcl() 828 AddAccessAllowedAce(Acl, ACL_REVISION, FILE_ALL_ACCESS, SidAdmins) 829 AddAccessAllowedAce( 830 Acl, 831 ACL_REVISION, 832 FILE_GENERIC_READ | FILE_GENERIC_WRITE | DELETE, 833 SidUser, 834 ) 835 836 SelfRelativeSD = GetFileSecurity(fname, DACL_SECURITY_INFORMATION) 837 AbsoluteSD = MakeAbsoluteSD(SelfRelativeSD) 838 SetSecurityDescriptorDacl(AbsoluteSD, 1, Acl, 0) 839 SelfRelativeSD = MakeSelfRelativeSD(AbsoluteSD) 840 841 SetFileSecurity(fname, DACL_SECURITY_INFORMATION, SelfRelativeSD) 842 843 844def get_file_mode(fname): 845 """Retrieves the file mode corresponding to fname in a filesystem-tolerant manner. 846 847 Parameters 848 ---------- 849 850 fname : unicode 851 The path to the file to get mode from 852 853 """ 854 # Some filesystems (e.g., CIFS) auto-enable the execute bit on files. As a result, we 855 # should tolerate the execute bit on the file's owner when validating permissions - thus 856 # the missing least significant bit on the third octal digit. In addition, we also tolerate 857 # the sticky bit being set, so the lsb from the fourth octal digit is also removed. 858 return stat.S_IMODE(os.stat(fname).st_mode) & 0o6677 # Use 4 octal digits since S_IMODE does the same 859 860 861allow_insecure_writes = os.getenv('JUPYTER_ALLOW_INSECURE_WRITES', 'false').lower() in ('true', '1') 862 863 864@contextmanager 865def secure_write(fname, binary=False): 866 """Opens a file in the most restricted pattern available for 867 writing content. This limits the file mode to `0o0600` and yields 868 the resulting opened filed handle. 869 870 Parameters 871 ---------- 872 873 fname : unicode 874 The path to the file to write 875 876 binary: boolean 877 Indicates that the file is binary 878 """ 879 mode = 'wb' if binary else 'w' 880 open_flag = os.O_CREAT | os.O_WRONLY | os.O_TRUNC 881 try: 882 os.remove(fname) 883 except (IOError, OSError): 884 # Skip any issues with the file not existing 885 pass 886 887 if os.name == 'nt': 888 if allow_insecure_writes: 889 # Mounted file systems can have a number of failure modes inside this block. 890 # For windows machines in insecure mode we simply skip this to avoid failures :/ 891 issue_insecure_write_warning() 892 else: 893 # Python on windows does not respect the group and public bits for chmod, so we need 894 # to take additional steps to secure the contents. 895 # Touch file pre-emptively to avoid editing permissions in open files in Windows 896 fd = os.open(fname, open_flag, 0o0600) 897 os.close(fd) 898 open_flag = os.O_WRONLY | os.O_TRUNC 899 win32_restrict_file_to_user(fname) 900 901 with os.fdopen(os.open(fname, open_flag, 0o0600), mode) as f: 902 if os.name != 'nt': 903 # Enforce that the file got the requested permissions before writing 904 file_mode = get_file_mode(fname) 905 if 0o0600 != file_mode: 906 if allow_insecure_writes: 907 issue_insecure_write_warning() 908 else: 909 raise RuntimeError("Permissions assignment failed for secure file: '{file}'." 910 " Got '{permissions}' instead of '0o0600'." 911 .format(file=fname, permissions=oct(file_mode))) 912 yield f 913 914 915def issue_insecure_write_warning(): 916 def format_warning(msg, *args, **kwargs): 917 return str(msg) + '\n' 918 919 warnings.formatwarning = format_warning 920 warnings.warn("WARNING: Insecure writes have been enabled via environment variable " 921 "'JUPYTER_ALLOW_INSECURE_WRITES'! If this is not intended, remove the " 922 "variable or set its value to 'False'.") 923