1"""Path utility functions."""
2
3# Copyright (c) Jupyter Development Team.
4# Distributed under the terms of the Modified BSD License.
5
6# Derived from IPython.utils.path, which is
7# Copyright (c) IPython Development Team.
8# Distributed under the terms of the Modified BSD License.
9
10
11import os
12import sys
13import stat
14import errno
15import site
16import tempfile
17import warnings
18from pathlib import Path
19
20from contextlib import contextmanager
21
22pjoin = os.path.join
23
24# UF_HIDDEN is a stat flag not defined in the stat module.
25# It is used by BSD to indicate hidden files.
26UF_HIDDEN = getattr(stat, 'UF_HIDDEN', 32768)
27
28
29def envset(name):
30    """Return True if the given environment variable is set
31
32    An environment variable is considered set if it is assigned to a value
33    other than 'no', 'n', 'false', 'off', '0', or '0.0' (case insensitive)
34    """
35    return os.environ.get(name, 'no').lower() not in ['no', 'n', 'false', 'off', '0', '0.0']
36
37def get_home_dir():
38    """Get the real path of the home directory"""
39    homedir = os.path.expanduser('~')
40    # Next line will make things work even when /home/ is a symlink to
41    # /usr/home as it is on FreeBSD, for example
42    homedir = str(Path(homedir).resolve())
43    return homedir
44
45_dtemps = {}
46def _mkdtemp_once(name):
47    """Make or reuse a temporary directory.
48
49    If this is called with the same name in the same process, it will return
50    the same directory.
51    """
52    try:
53        return _dtemps[name]
54    except KeyError:
55        d = _dtemps[name] = tempfile.mkdtemp(prefix=name + '-')
56        return d
57
58def jupyter_config_dir():
59    """Get the Jupyter config directory for this platform and user.
60
61    Returns JUPYTER_CONFIG_DIR if defined, else ~/.jupyter
62    """
63
64    env = os.environ
65    home_dir = get_home_dir()
66
67    if env.get('JUPYTER_NO_CONFIG'):
68        return _mkdtemp_once('jupyter-clean-cfg')
69
70    if env.get('JUPYTER_CONFIG_DIR'):
71        return env['JUPYTER_CONFIG_DIR']
72
73    return pjoin(home_dir, '.jupyter')
74
75
76def jupyter_data_dir():
77    """Get the config directory for Jupyter data files for this platform and user.
78
79    These are non-transient, non-configuration files.
80
81    Returns JUPYTER_DATA_DIR if defined, else a platform-appropriate path.
82    """
83    env = os.environ
84
85    if env.get('JUPYTER_DATA_DIR'):
86        return env['JUPYTER_DATA_DIR']
87
88    home = get_home_dir()
89
90    if sys.platform == 'darwin':
91        return os.path.join(home, 'Library', 'Jupyter')
92    elif os.name == 'nt':
93        appdata = os.environ.get('APPDATA', None)
94        if appdata:
95            return str(Path(appdata, 'jupyter').resolve())
96        else:
97            return pjoin(jupyter_config_dir(), 'data')
98    else:
99        # Linux, non-OS X Unix, AIX, etc.
100        xdg = env.get("XDG_DATA_HOME", None)
101        if not xdg:
102            xdg = pjoin(home, '.local', 'share')
103        return pjoin(xdg, 'jupyter')
104
105
106def jupyter_runtime_dir():
107    """Return the runtime dir for transient jupyter files.
108
109    Returns JUPYTER_RUNTIME_DIR if defined.
110
111    The default is now (data_dir)/runtime on all platforms;
112    we no longer use XDG_RUNTIME_DIR after various problems.
113    """
114    env = os.environ
115
116    if env.get('JUPYTER_RUNTIME_DIR'):
117        return env['JUPYTER_RUNTIME_DIR']
118
119    return pjoin(jupyter_data_dir(), 'runtime')
120
121
122if os.name == 'nt':
123    programdata = os.environ.get('PROGRAMDATA', None)
124    if programdata:
125        SYSTEM_JUPYTER_PATH = [pjoin(programdata, 'jupyter')]
126    else:  # PROGRAMDATA is not defined by default on XP.
127        SYSTEM_JUPYTER_PATH = [os.path.join(sys.prefix, 'share', 'jupyter')]
128else:
129    SYSTEM_JUPYTER_PATH = [
130        "/usr/local/share/jupyter",
131        "/usr/share/jupyter",
132    ]
133
134ENV_JUPYTER_PATH = [os.path.join(sys.prefix, 'share', 'jupyter')]
135
136
137def jupyter_path(*subdirs):
138    """Return a list of directories to search for data files
139
140    JUPYTER_PATH environment variable has highest priority.
141
142    If the JUPYTER_PREFER_ENV_PATH environment variable is set, the environment-level
143    directories will have priority over user-level directories.
144
145    If the Python site.ENABLE_USER_SITE variable is True, we also add the
146    appropriate Python user site subdirectory to the user-level directories.
147
148
149    If ``*subdirs`` are given, that subdirectory will be added to each element.
150
151    Examples:
152
153    >>> jupyter_path()
154    ['~/.local/jupyter', '/usr/local/share/jupyter']
155    >>> jupyter_path('kernels')
156    ['~/.local/jupyter/kernels', '/usr/local/share/jupyter/kernels']
157    """
158
159    paths = []
160
161    # highest priority is explicit environment variable
162    if os.environ.get('JUPYTER_PATH'):
163        paths.extend(
164            p.rstrip(os.sep)
165            for p in os.environ['JUPYTER_PATH'].split(os.pathsep)
166        )
167
168    # Next is environment or user, depending on the JUPYTER_PREFER_ENV_PATH flag
169    user = [jupyter_data_dir()]
170    if site.ENABLE_USER_SITE:
171        # Check if site.getuserbase() exists to be compatible with virtualenv,
172        # which often does not have this method.
173        if hasattr(site, 'getuserbase'):
174            userbase = site.getuserbase()
175        else:
176            userbase = site.USER_BASE
177        userdir = os.path.join(userbase, 'share', 'jupyter')
178        if userdir not in user:
179            user.append(userdir)
180
181    env = [p for p in ENV_JUPYTER_PATH if p not in SYSTEM_JUPYTER_PATH]
182
183    if envset('JUPYTER_PREFER_ENV_PATH'):
184        paths.extend(env)
185        paths.extend(user)
186    else:
187        paths.extend(user)
188        paths.extend(env)
189
190    # finally, system
191    paths.extend(SYSTEM_JUPYTER_PATH)
192
193    # add subdir, if requested
194    if subdirs:
195        paths = [ pjoin(p, *subdirs) for p in paths ]
196    return paths
197
198
199if os.name == 'nt':
200    programdata = os.environ.get('PROGRAMDATA', None)
201    if programdata:
202        SYSTEM_CONFIG_PATH = [os.path.join(programdata, 'jupyter')]
203    else:  # PROGRAMDATA is not defined by default on XP.
204        SYSTEM_CONFIG_PATH = []
205else:
206    SYSTEM_CONFIG_PATH = [
207        "/usr/local/etc/jupyter",
208        "/etc/jupyter",
209    ]
210
211ENV_CONFIG_PATH = [os.path.join(sys.prefix, 'etc', 'jupyter')]
212
213
214def jupyter_config_path():
215    """Return the search path for Jupyter config files as a list.
216
217    If the JUPYTER_PREFER_ENV_PATH environment variable is set, the
218    environment-level directories will have priority over user-level
219    directories.
220
221    If the Python site.ENABLE_USER_SITE variable is True, we also add the
222    appropriate Python user site subdirectory to the user-level directories.
223    """
224    if os.environ.get('JUPYTER_NO_CONFIG'):
225        # jupyter_config_dir makes a blank config when JUPYTER_NO_CONFIG is set.
226        return [jupyter_config_dir()]
227
228    paths = []
229
230    # highest priority is explicit environment variable
231    if os.environ.get('JUPYTER_CONFIG_PATH'):
232        paths.extend(
233            p.rstrip(os.sep)
234            for p in os.environ['JUPYTER_CONFIG_PATH'].split(os.pathsep)
235        )
236
237    # Next is environment or user, depending on the JUPYTER_PREFER_ENV_PATH flag
238    user = [jupyter_config_dir()]
239    if site.ENABLE_USER_SITE:
240        # Check if site.getuserbase() exists to be compatible with virtualenv,
241        # which often does not have this method.
242        if hasattr(site, 'getuserbase'):
243            userbase = site.getuserbase()
244        else:
245            userbase = site.USER_BASE
246
247        userdir = os.path.join(userbase, 'etc', 'jupyter')
248        if userdir not in user:
249            user.append(userdir)
250
251    env = [p for p in ENV_CONFIG_PATH if p not in SYSTEM_CONFIG_PATH]
252
253    if envset('JUPYTER_PREFER_ENV_PATH'):
254        paths.extend(env)
255        paths.extend(user)
256    else:
257        paths.extend(user)
258        paths.extend(env)
259
260    # Finally, system path
261    paths.extend(SYSTEM_CONFIG_PATH)
262    return paths
263
264
265def exists(path):
266    """Replacement for `os.path.exists` which works for host mapped volumes
267    on Windows containers
268    """
269    try:
270        os.lstat(path)
271    except OSError:
272        return False
273    return True
274
275
276def is_file_hidden_win(abs_path, stat_res=None):
277    """Is a file hidden?
278
279    This only checks the file itself; it should be called in combination with
280    checking the directory containing the file.
281
282    Use is_hidden() instead to check the file and its parent directories.
283
284    Parameters
285    ----------
286    abs_path : unicode
287        The absolute path to check.
288    stat_res : os.stat_result, optional
289        The result of calling stat() on abs_path. If not passed, this function
290        will call stat() internally.
291    """
292    if os.path.basename(abs_path).startswith('.'):
293        return True
294
295    if stat_res is None:
296        try:
297            stat_res = os.stat(abs_path)
298        except OSError as e:
299            if e.errno == errno.ENOENT:
300                return False
301            raise
302
303    try:
304        if stat_res.st_file_attributes & stat.FILE_ATTRIBUTE_HIDDEN:
305            return True
306    except AttributeError:
307        # allow AttributeError on PyPy for Windows
308        # 'stat_result' object has no attribute 'st_file_attributes'
309        # https://foss.heptapod.net/pypy/pypy/-/issues/3469
310        warnings.warn("hidden files are not detectable on this system, so no file will be marked as hidden.")
311        pass
312
313    return False
314
315
316def is_file_hidden_posix(abs_path, stat_res=None):
317    """Is a file hidden?
318
319    This only checks the file itself; it should be called in combination with
320    checking the directory containing the file.
321
322    Use is_hidden() instead to check the file and its parent directories.
323
324    Parameters
325    ----------
326    abs_path : unicode
327        The absolute path to check.
328    stat_res : os.stat_result, optional
329        The result of calling stat() on abs_path. If not passed, this function
330        will call stat() internally.
331    """
332    if os.path.basename(abs_path).startswith('.'):
333        return True
334
335    if stat_res is None or stat.S_ISLNK(stat_res.st_mode):
336        try:
337            stat_res = os.stat(abs_path)
338        except OSError as e:
339            if e.errno == errno.ENOENT:
340                return False
341            raise
342
343    # check that dirs can be listed
344    if stat.S_ISDIR(stat_res.st_mode):
345        # use x-access, not actual listing, in case of slow/large listings
346        if not os.access(abs_path, os.X_OK | os.R_OK):
347            return True
348
349    # check UF_HIDDEN
350    if getattr(stat_res, 'st_flags', 0) & UF_HIDDEN:
351        return True
352
353    return False
354
355
356if sys.platform == 'win32':
357    is_file_hidden = is_file_hidden_win
358else:
359    is_file_hidden = is_file_hidden_posix
360
361
362def is_hidden(abs_path, abs_root=''):
363    """Is a file hidden or contained in a hidden directory?
364
365    This will start with the rightmost path element and work backwards to the
366    given root to see if a path is hidden or in a hidden directory. Hidden is
367    determined by either name starting with '.' or the UF_HIDDEN flag as
368    reported by stat.
369
370    If abs_path is the same directory as abs_root, it will be visible even if
371    that is a hidden folder. This only checks the visibility of files
372    and directories *within* abs_root.
373
374    Parameters
375    ----------
376    abs_path : unicode
377        The absolute path to check for hidden directories.
378    abs_root : unicode
379        The absolute path of the root directory in which hidden directories
380        should be checked for.
381    """
382    if os.path.normpath(abs_path) == os.path.normpath(abs_root):
383        return False
384
385    if is_file_hidden(abs_path):
386        return True
387
388    if not abs_root:
389        abs_root = abs_path.split(os.sep, 1)[0] + os.sep
390    inside_root = abs_path[len(abs_root):]
391    if any(part.startswith('.') for part in inside_root.split(os.sep)):
392        return True
393
394    # check UF_HIDDEN on any location up to root.
395    # is_file_hidden() already checked the file, so start from its parent dir
396    path = os.path.dirname(abs_path)
397    while path and path.startswith(abs_root) and path != abs_root:
398        if not exists(path):
399            path = os.path.dirname(path)
400            continue
401        try:
402            # may fail on Windows junctions
403            st = os.lstat(path)
404        except OSError:
405            return True
406        if getattr(st, 'st_flags', 0) & UF_HIDDEN:
407            return True
408        path = os.path.dirname(path)
409
410    return False
411
412
413def win32_restrict_file_to_user(fname):
414    """Secure a windows file to read-only access for the user.
415    Follows guidance from win32 library creator:
416    http://timgolden.me.uk/python/win32_how_do_i/add-security-to-a-file.html
417
418    This method should be executed against an already generated file which
419    has no secrets written to it yet.
420
421    Parameters
422    ----------
423
424    fname : unicode
425        The path to the file to secure
426    """
427    try:
428        import win32api
429    except ImportError:
430        return _win32_restrict_file_to_user_ctypes(fname)
431
432    import win32security
433    import ntsecuritycon as con
434
435    # everyone, _domain, _type = win32security.LookupAccountName("", "Everyone")
436    admins = win32security.CreateWellKnownSid(win32security.WinBuiltinAdministratorsSid)
437    user, _domain, _type = win32security.LookupAccountName("", win32api.GetUserNameEx(win32api.NameSamCompatible))
438
439    sd = win32security.GetFileSecurity(fname, win32security.DACL_SECURITY_INFORMATION)
440
441    dacl = win32security.ACL()
442    # dacl.AddAccessAllowedAce(win32security.ACL_REVISION, con.FILE_ALL_ACCESS, everyone)
443    dacl.AddAccessAllowedAce(win32security.ACL_REVISION, con.FILE_GENERIC_READ | con.FILE_GENERIC_WRITE | con.DELETE, user)
444    dacl.AddAccessAllowedAce(win32security.ACL_REVISION, con.FILE_ALL_ACCESS, admins)
445
446    sd.SetSecurityDescriptorDacl(1, dacl, 0)
447    win32security.SetFileSecurity(fname, win32security.DACL_SECURITY_INFORMATION, sd)
448
449
450def _win32_restrict_file_to_user_ctypes(fname):
451    """Secure a windows file to read-only access for the user.
452
453    Follows guidance from win32 library creator:
454    http://timgolden.me.uk/python/win32_how_do_i/add-security-to-a-file.html
455
456    This method should be executed against an already generated file which
457    has no secrets written to it yet.
458
459    Parameters
460    ----------
461
462    fname : unicode
463        The path to the file to secure
464    """
465    import ctypes
466    from ctypes import wintypes
467
468    advapi32 = ctypes.WinDLL('advapi32', use_last_error=True)
469    secur32 = ctypes.WinDLL('secur32', use_last_error=True)
470
471    NameSamCompatible = 2
472    WinBuiltinAdministratorsSid = 26
473    DACL_SECURITY_INFORMATION = 4
474    ACL_REVISION = 2
475    ERROR_INSUFFICIENT_BUFFER = 122
476    ERROR_MORE_DATA = 234
477
478    SYNCHRONIZE = 0x100000
479    DELETE = 0x00010000
480    STANDARD_RIGHTS_REQUIRED = 0xF0000
481    STANDARD_RIGHTS_READ = 0x20000
482    STANDARD_RIGHTS_WRITE = 0x20000
483    FILE_READ_DATA = 1
484    FILE_READ_EA = 8
485    FILE_READ_ATTRIBUTES = 128
486    FILE_WRITE_DATA = 2
487    FILE_APPEND_DATA = 4
488    FILE_WRITE_EA = 16
489    FILE_WRITE_ATTRIBUTES = 256
490    FILE_ALL_ACCESS = STANDARD_RIGHTS_REQUIRED | SYNCHRONIZE | 0x1FF
491    FILE_GENERIC_READ = (
492        STANDARD_RIGHTS_READ
493        | FILE_READ_DATA
494        | FILE_READ_ATTRIBUTES
495        | FILE_READ_EA
496        | SYNCHRONIZE
497    )
498    FILE_GENERIC_WRITE = (
499        STANDARD_RIGHTS_WRITE
500        | FILE_WRITE_DATA
501        | FILE_WRITE_ATTRIBUTES
502        | FILE_WRITE_EA
503        | FILE_APPEND_DATA
504        | SYNCHRONIZE
505    )
506
507    class ACL(ctypes.Structure):
508        _fields_ = [
509            ('AclRevision', wintypes.BYTE),
510            ('Sbz1', wintypes.BYTE),
511            ('AclSize', wintypes.WORD),
512            ('AceCount', wintypes.WORD),
513            ('Sbz2', wintypes.WORD),
514        ]
515
516    PSID = ctypes.c_void_p
517    PACL = ctypes.POINTER(ACL)
518    PSECURITY_DESCRIPTOR = ctypes.POINTER(wintypes.BYTE)
519
520    def _nonzero_success(result, func, args):
521        if not result:
522            raise ctypes.WinError(ctypes.get_last_error())
523        return args
524
525    secur32.GetUserNameExW.errcheck = _nonzero_success
526    secur32.GetUserNameExW.restype = wintypes.BOOL
527    secur32.GetUserNameExW.argtypes = (
528        ctypes.c_int,  # EXTENDED_NAME_FORMAT NameFormat
529        wintypes.LPWSTR,  # LPWSTR lpNameBuffer,
530        wintypes.PULONG,  # PULONG nSize
531    )
532
533    advapi32.CreateWellKnownSid.errcheck = _nonzero_success
534    advapi32.CreateWellKnownSid.restype = wintypes.BOOL
535    advapi32.CreateWellKnownSid.argtypes = (
536        wintypes.DWORD,  # WELL_KNOWN_SID_TYPE WellKnownSidType
537        PSID,  # PSID DomainSid
538        PSID,  # PSID pSid
539        wintypes.PDWORD,  # DWORD *cbSid
540    )
541
542    advapi32.LookupAccountNameW.errcheck = _nonzero_success
543    advapi32.LookupAccountNameW.restype = wintypes.BOOL
544    advapi32.LookupAccountNameW.argtypes = (
545        wintypes.LPWSTR,  # LPCWSTR lpSystemName
546        wintypes.LPWSTR,  # LPCWSTR lpAccountName
547        PSID,  # PSID Sid
548        wintypes.LPDWORD,  # LPDWORD cbSid
549        wintypes.LPWSTR,  # LPCWSTR ReferencedDomainName
550        wintypes.LPDWORD,  # LPDWORD cchReferencedDomainName
551        wintypes.LPDWORD,  # PSID_NAME_USE peUse
552    )
553
554    advapi32.AddAccessAllowedAce.errcheck = _nonzero_success
555    advapi32.AddAccessAllowedAce.restype = wintypes.BOOL
556    advapi32.AddAccessAllowedAce.argtypes = (
557        PACL,  # PACL pAcl
558        wintypes.DWORD,  # DWORD dwAceRevision
559        wintypes.DWORD,  # DWORD AccessMask
560        PSID,  # PSID pSid
561    )
562
563    advapi32.SetSecurityDescriptorDacl.errcheck = _nonzero_success
564    advapi32.SetSecurityDescriptorDacl.restype = wintypes.BOOL
565    advapi32.SetSecurityDescriptorDacl.argtypes = (
566        PSECURITY_DESCRIPTOR,  # PSECURITY_DESCRIPTOR pSecurityDescriptor
567        wintypes.BOOL,  # BOOL bDaclPresent
568        PACL,  # PACL pDacl
569        wintypes.BOOL,  # BOOL bDaclDefaulted
570    )
571
572    advapi32.GetFileSecurityW.errcheck = _nonzero_success
573    advapi32.GetFileSecurityW.restype = wintypes.BOOL
574    advapi32.GetFileSecurityW.argtypes = (
575        wintypes.LPCWSTR,  # LPCWSTR lpFileName
576        wintypes.DWORD,  # SECURITY_INFORMATION RequestedInformation
577        PSECURITY_DESCRIPTOR,  # PSECURITY_DESCRIPTOR pSecurityDescriptor
578        wintypes.DWORD,  # DWORD nLength
579        wintypes.LPDWORD,  # LPDWORD lpnLengthNeeded
580    )
581
582    advapi32.SetFileSecurityW.errcheck = _nonzero_success
583    advapi32.SetFileSecurityW.restype = wintypes.BOOL
584    advapi32.SetFileSecurityW.argtypes = (
585        wintypes.LPCWSTR,  # LPCWSTR lpFileName
586        wintypes.DWORD,  # SECURITY_INFORMATION SecurityInformation
587        PSECURITY_DESCRIPTOR,  # PSECURITY_DESCRIPTOR pSecurityDescriptor
588    )
589
590    advapi32.MakeAbsoluteSD.errcheck = _nonzero_success
591    advapi32.MakeAbsoluteSD.restype = wintypes.BOOL
592    advapi32.MakeAbsoluteSD.argtypes = (
593        PSECURITY_DESCRIPTOR,  # pSelfRelativeSecurityDescriptor
594        PSECURITY_DESCRIPTOR,  # pAbsoluteSecurityDescriptor
595        wintypes.LPDWORD,  # LPDWORD lpdwAbsoluteSecurityDescriptorSize
596        PACL,  # PACL pDacl
597        wintypes.LPDWORD,  # LPDWORD lpdwDaclSize
598        PACL,  # PACL pSacl
599        wintypes.LPDWORD,  # LPDWORD lpdwSaclSize
600        PSID,  # PSID pOwner
601        wintypes.LPDWORD,  # LPDWORD lpdwOwnerSize
602        PSID,  # PSID pPrimaryGroup
603        wintypes.LPDWORD,  # LPDWORD lpdwPrimaryGroupSize
604    )
605
606    advapi32.MakeSelfRelativeSD.errcheck = _nonzero_success
607    advapi32.MakeSelfRelativeSD.restype = wintypes.BOOL
608    advapi32.MakeSelfRelativeSD.argtypes = (
609        PSECURITY_DESCRIPTOR,  # pAbsoluteSecurityDescriptor
610        PSECURITY_DESCRIPTOR,  # pSelfRelativeSecurityDescriptor
611        wintypes.LPDWORD,  # LPDWORD lpdwBufferLength
612    )
613
614    advapi32.InitializeAcl.errcheck = _nonzero_success
615    advapi32.InitializeAcl.restype = wintypes.BOOL
616    advapi32.InitializeAcl.argtypes = (
617        PACL,  # PACL pAcl,
618        wintypes.DWORD,  # DWORD nAclLength,
619        wintypes.DWORD,  # DWORD dwAclRevision
620    )
621
622    def CreateWellKnownSid(WellKnownSidType):
623        # return a SID for predefined aliases
624        pSid = (ctypes.c_char * 1)()
625        cbSid = wintypes.DWORD()
626        try:
627            advapi32.CreateWellKnownSid(
628                WellKnownSidType, None, pSid, ctypes.byref(cbSid)
629            )
630        except OSError as e:
631            if e.winerror != ERROR_INSUFFICIENT_BUFFER:
632                raise
633            pSid = (ctypes.c_char * cbSid.value)()
634            advapi32.CreateWellKnownSid(
635                WellKnownSidType, None, pSid, ctypes.byref(cbSid)
636            )
637        return pSid[:]
638
639    def GetUserNameEx(NameFormat):
640        # return the user or other security principal associated with
641        # the calling thread
642        nSize = ctypes.pointer(ctypes.c_ulong(0))
643        try:
644            secur32.GetUserNameExW(NameFormat, None, nSize)
645        except WindowsError as e:
646            if e.winerror != ERROR_MORE_DATA:
647                raise
648        if not nSize.contents.value:
649            return None
650        lpNameBuffer = ctypes.create_unicode_buffer(nSize.contents.value)
651        secur32.GetUserNameExW(NameFormat, lpNameBuffer, nSize)
652        return lpNameBuffer.value
653
654    def LookupAccountName(lpSystemName, lpAccountName):
655        # return a security identifier (SID) for an account on a system
656        # and the name of the domain on which the account was found
657        cbSid = wintypes.DWORD(0)
658        cchReferencedDomainName = wintypes.DWORD(0)
659        peUse = wintypes.DWORD(0)
660        try:
661            advapi32.LookupAccountNameW(
662                lpSystemName,
663                lpAccountName,
664                None,
665                ctypes.byref(cbSid),
666                None,
667                ctypes.byref(cchReferencedDomainName),
668                ctypes.byref(peUse),
669            )
670        except WindowsError as e:
671            if e.winerror != ERROR_INSUFFICIENT_BUFFER:
672                raise
673        Sid = ctypes.create_unicode_buffer('', cbSid.value)
674        pSid = ctypes.cast(ctypes.pointer(Sid), wintypes.LPVOID)
675        lpReferencedDomainName = ctypes.create_unicode_buffer(
676            '', cchReferencedDomainName.value + 1
677        )
678        success = advapi32.LookupAccountNameW(
679            lpSystemName,
680            lpAccountName,
681            pSid,
682            ctypes.byref(cbSid),
683            lpReferencedDomainName,
684            ctypes.byref(cchReferencedDomainName),
685            ctypes.byref(peUse),
686        )
687        if not success:
688            raise ctypes.WinError()
689        return pSid, lpReferencedDomainName.value, peUse.value
690
691    def AddAccessAllowedAce(pAcl, dwAceRevision, AccessMask, pSid):
692        # add an access-allowed access control entry (ACE)
693        # to an access control list (ACL)
694        advapi32.AddAccessAllowedAce(pAcl, dwAceRevision, AccessMask, pSid)
695
696    def GetFileSecurity(lpFileName, RequestedInformation):
697        # return information about the security of a file or directory
698        nLength = wintypes.DWORD(0)
699        try:
700            advapi32.GetFileSecurityW(
701                lpFileName,
702                RequestedInformation,
703                None,
704                0,
705                ctypes.byref(nLength),
706            )
707        except WindowsError as e:
708            if e.winerror != ERROR_INSUFFICIENT_BUFFER:
709                raise
710        if not nLength.value:
711            return None
712        pSecurityDescriptor = (wintypes.BYTE * nLength.value)()
713        advapi32.GetFileSecurityW(
714            lpFileName,
715            RequestedInformation,
716            pSecurityDescriptor,
717            nLength,
718            ctypes.byref(nLength),
719        )
720        return pSecurityDescriptor
721
722    def SetFileSecurity(lpFileName, RequestedInformation, pSecurityDescriptor):
723        # set the security of a file or directory object
724        advapi32.SetFileSecurityW(
725            lpFileName, RequestedInformation, pSecurityDescriptor
726        )
727
728    def SetSecurityDescriptorDacl(
729        pSecurityDescriptor, bDaclPresent, pDacl, bDaclDefaulted
730    ):
731        # set information in a discretionary access control list (DACL)
732        advapi32.SetSecurityDescriptorDacl(
733            pSecurityDescriptor, bDaclPresent, pDacl, bDaclDefaulted
734        )
735
736    def MakeAbsoluteSD(pSelfRelativeSecurityDescriptor):
737        # return a security descriptor in absolute format
738        # by using a security descriptor in self-relative format as a template
739        pAbsoluteSecurityDescriptor = None
740        lpdwAbsoluteSecurityDescriptorSize = wintypes.DWORD(0)
741        pDacl = None
742        lpdwDaclSize = wintypes.DWORD(0)
743        pSacl = None
744        lpdwSaclSize = wintypes.DWORD(0)
745        pOwner = None
746        lpdwOwnerSize = wintypes.DWORD(0)
747        pPrimaryGroup = None
748        lpdwPrimaryGroupSize = wintypes.DWORD(0)
749        try:
750            advapi32.MakeAbsoluteSD(
751                pSelfRelativeSecurityDescriptor,
752                pAbsoluteSecurityDescriptor,
753                ctypes.byref(lpdwAbsoluteSecurityDescriptorSize),
754                pDacl,
755                ctypes.byref(lpdwDaclSize),
756                pSacl,
757                ctypes.byref(lpdwSaclSize),
758                pOwner,
759                ctypes.byref(lpdwOwnerSize),
760                pPrimaryGroup,
761                ctypes.byref(lpdwPrimaryGroupSize),
762            )
763        except WindowsError as e:
764            if e.winerror != ERROR_INSUFFICIENT_BUFFER:
765                raise
766        pAbsoluteSecurityDescriptor = (
767            wintypes.BYTE * lpdwAbsoluteSecurityDescriptorSize.value
768        )()
769        pDaclData = (wintypes.BYTE * lpdwDaclSize.value)()
770        pDacl = ctypes.cast(pDaclData, PACL).contents
771        pSaclData = (wintypes.BYTE * lpdwSaclSize.value)()
772        pSacl = ctypes.cast(pSaclData, PACL).contents
773        pOwnerData = (wintypes.BYTE * lpdwOwnerSize.value)()
774        pOwner = ctypes.cast(pOwnerData, PSID)
775        pPrimaryGroupData = (wintypes.BYTE * lpdwPrimaryGroupSize.value)()
776        pPrimaryGroup = ctypes.cast(pPrimaryGroupData, PSID)
777        advapi32.MakeAbsoluteSD(
778            pSelfRelativeSecurityDescriptor,
779            pAbsoluteSecurityDescriptor,
780            ctypes.byref(lpdwAbsoluteSecurityDescriptorSize),
781            pDacl,
782            ctypes.byref(lpdwDaclSize),
783            pSacl,
784            ctypes.byref(lpdwSaclSize),
785            pOwner,
786            lpdwOwnerSize,
787            pPrimaryGroup,
788            ctypes.byref(lpdwPrimaryGroupSize),
789        )
790        return pAbsoluteSecurityDescriptor
791
792    def MakeSelfRelativeSD(pAbsoluteSecurityDescriptor):
793        # return a security descriptor in self-relative format
794        # by using a security descriptor in absolute format as a template
795        pSelfRelativeSecurityDescriptor = None
796        lpdwBufferLength = wintypes.DWORD(0)
797        try:
798            advapi32.MakeSelfRelativeSD(
799                pAbsoluteSecurityDescriptor,
800                pSelfRelativeSecurityDescriptor,
801                ctypes.byref(lpdwBufferLength),
802            )
803        except WindowsError as e:
804            if e.winerror != ERROR_INSUFFICIENT_BUFFER:
805                raise
806        pSelfRelativeSecurityDescriptor = (
807            wintypes.BYTE * lpdwBufferLength.value
808        )()
809        advapi32.MakeSelfRelativeSD(
810            pAbsoluteSecurityDescriptor,
811            pSelfRelativeSecurityDescriptor,
812            ctypes.byref(lpdwBufferLength),
813        )
814        return pSelfRelativeSecurityDescriptor
815
816    def NewAcl():
817        # return a new, initialized ACL (access control list) structure
818        nAclLength = 32767  # TODO: calculate this: ctypes.sizeof(ACL) + ?
819        acl_data = ctypes.create_string_buffer(nAclLength)
820        pAcl = ctypes.cast(acl_data, PACL).contents
821        advapi32.InitializeAcl(pAcl, nAclLength, ACL_REVISION)
822        return pAcl
823
824    SidAdmins = CreateWellKnownSid(WinBuiltinAdministratorsSid)
825    SidUser = LookupAccountName('', GetUserNameEx(NameSamCompatible))[0]
826
827    Acl = NewAcl()
828    AddAccessAllowedAce(Acl, ACL_REVISION, FILE_ALL_ACCESS, SidAdmins)
829    AddAccessAllowedAce(
830        Acl,
831        ACL_REVISION,
832        FILE_GENERIC_READ | FILE_GENERIC_WRITE | DELETE,
833        SidUser,
834    )
835
836    SelfRelativeSD = GetFileSecurity(fname, DACL_SECURITY_INFORMATION)
837    AbsoluteSD = MakeAbsoluteSD(SelfRelativeSD)
838    SetSecurityDescriptorDacl(AbsoluteSD, 1, Acl, 0)
839    SelfRelativeSD = MakeSelfRelativeSD(AbsoluteSD)
840
841    SetFileSecurity(fname, DACL_SECURITY_INFORMATION, SelfRelativeSD)
842
843
844def get_file_mode(fname):
845    """Retrieves the file mode corresponding to fname in a filesystem-tolerant manner.
846
847    Parameters
848    ----------
849
850    fname : unicode
851        The path to the file to get mode from
852
853    """
854    # Some filesystems (e.g., CIFS) auto-enable the execute bit on files.  As a result, we
855    # should tolerate the execute bit on the file's owner when validating permissions - thus
856    # the missing least significant bit on the third octal digit. In addition, we also tolerate
857    # the sticky bit being set, so the lsb from the fourth octal digit is also removed.
858    return stat.S_IMODE(os.stat(fname).st_mode) & 0o6677  # Use 4 octal digits since S_IMODE does the same
859
860
861allow_insecure_writes = os.getenv('JUPYTER_ALLOW_INSECURE_WRITES', 'false').lower() in ('true', '1')
862
863
864@contextmanager
865def secure_write(fname, binary=False):
866    """Opens a file in the most restricted pattern available for
867    writing content. This limits the file mode to `0o0600` and yields
868    the resulting opened filed handle.
869
870    Parameters
871    ----------
872
873    fname : unicode
874        The path to the file to write
875
876    binary: boolean
877        Indicates that the file is binary
878    """
879    mode = 'wb' if binary else 'w'
880    open_flag = os.O_CREAT | os.O_WRONLY | os.O_TRUNC
881    try:
882        os.remove(fname)
883    except (IOError, OSError):
884        # Skip any issues with the file not existing
885        pass
886
887    if os.name == 'nt':
888        if allow_insecure_writes:
889            # Mounted file systems can have a number of failure modes inside this block.
890            # For windows machines in insecure mode we simply skip this to avoid failures :/
891            issue_insecure_write_warning()
892        else:
893            # Python on windows does not respect the group and public bits for chmod, so we need
894            # to take additional steps to secure the contents.
895            # Touch file pre-emptively to avoid editing permissions in open files in Windows
896            fd = os.open(fname, open_flag, 0o0600)
897            os.close(fd)
898            open_flag = os.O_WRONLY | os.O_TRUNC
899            win32_restrict_file_to_user(fname)
900
901    with os.fdopen(os.open(fname, open_flag, 0o0600), mode) as f:
902        if os.name != 'nt':
903            # Enforce that the file got the requested permissions before writing
904            file_mode = get_file_mode(fname)
905            if 0o0600 != file_mode:
906                if allow_insecure_writes:
907                    issue_insecure_write_warning()
908                else:
909                    raise RuntimeError("Permissions assignment failed for secure file: '{file}'."
910                        " Got '{permissions}' instead of '0o0600'."
911                        .format(file=fname, permissions=oct(file_mode)))
912        yield f
913
914
915def issue_insecure_write_warning():
916    def format_warning(msg, *args, **kwargs):
917        return str(msg) + '\n'
918
919    warnings.formatwarning = format_warning
920    warnings.warn("WARNING: Insecure writes have been enabled via environment variable "
921                  "'JUPYTER_ALLOW_INSECURE_WRITES'! If this is not intended, remove the "
922                  "variable or set its value to 'False'.")
923