1"""Tools for managing kernel specs"""
2
3# Copyright (c) Jupyter Development Team.
4# Distributed under the terms of the Modified BSD License.
5
6import errno
7import io
8import json
9import os
10import re
11import shutil
12import warnings
13
14pjoin = os.path.join
15
16from traitlets import (
17    HasTraits, List, Unicode, Dict, Set, Bool, Type, CaselessStrEnum
18)
19from traitlets.config import LoggingConfigurable
20
21from jupyter_core.paths import jupyter_data_dir, jupyter_path, SYSTEM_JUPYTER_PATH
22
23
24NATIVE_KERNEL_NAME = 'python3'
25
26
27class KernelSpec(HasTraits):
28    argv = List()
29    display_name = Unicode()
30    language = Unicode()
31    env = Dict()
32    resource_dir = Unicode()
33    interrupt_mode = CaselessStrEnum(
34        ['message', 'signal'], default_value='signal'
35    )
36    metadata = Dict()
37
38    @classmethod
39    def from_resource_dir(cls, resource_dir):
40        """Create a KernelSpec object by reading kernel.json
41
42        Pass the path to the *directory* containing kernel.json.
43        """
44        kernel_file = pjoin(resource_dir, 'kernel.json')
45        with io.open(kernel_file, 'r', encoding='utf-8') as f:
46            kernel_dict = json.load(f)
47        return cls(resource_dir=resource_dir, **kernel_dict)
48
49    def to_dict(self):
50        d = dict(argv=self.argv,
51                 env=self.env,
52                 display_name=self.display_name,
53                 language=self.language,
54                 interrupt_mode=self.interrupt_mode,
55                 metadata=self.metadata,
56                )
57
58        return d
59
60    def to_json(self):
61        """Serialise this kernelspec to a JSON object.
62
63        Returns a string.
64        """
65        return json.dumps(self.to_dict())
66
67
68_kernel_name_pat = re.compile(r'^[a-z0-9._\-]+$', re.IGNORECASE)
69
70def _is_valid_kernel_name(name):
71    """Check that a kernel name is valid."""
72    # quote is not unicode-safe on Python 2
73    return _kernel_name_pat.match(name)
74
75
76_kernel_name_description = "Kernel names can only contain ASCII letters and numbers and these separators:" \
77 " - . _ (hyphen, period, and underscore)."
78
79
80def _is_kernel_dir(path):
81    """Is ``path`` a kernel directory?"""
82    return os.path.isdir(path) and os.path.isfile(pjoin(path, 'kernel.json'))
83
84
85def _list_kernels_in(dir):
86    """Return a mapping of kernel names to resource directories from dir.
87
88    If dir is None or does not exist, returns an empty dict.
89    """
90    if dir is None or not os.path.isdir(dir):
91        return {}
92    kernels = {}
93    for f in os.listdir(dir):
94        path = pjoin(dir, f)
95        if not _is_kernel_dir(path):
96            continue
97        key = f.lower()
98        if not _is_valid_kernel_name(key):
99            warnings.warn("Invalid kernelspec directory name (%s): %s"
100                % (_kernel_name_description, path), stacklevel=3,
101            )
102        kernels[key] = path
103    return kernels
104
105
106class NoSuchKernel(KeyError):
107    def __init__(self, name):
108        self.name = name
109
110    def __str__(self):
111        return "No such kernel named {}".format(self.name)
112
113
114class KernelSpecManager(LoggingConfigurable):
115
116    kernel_spec_class = Type(KernelSpec, config=True,
117        help="""The kernel spec class.  This is configurable to allow
118        subclassing of the KernelSpecManager for customized behavior.
119        """
120    )
121
122    ensure_native_kernel = Bool(True, config=True,
123        help="""If there is no Python kernelspec registered and the IPython
124        kernel is available, ensure it is added to the spec list.
125        """
126    )
127
128    data_dir = Unicode()
129    def _data_dir_default(self):
130        return jupyter_data_dir()
131
132    user_kernel_dir = Unicode()
133    def _user_kernel_dir_default(self):
134        return pjoin(self.data_dir, 'kernels')
135
136    whitelist = Set(config=True,
137        help="""Whitelist of allowed kernel names.
138
139        By default, all installed kernels are allowed.
140        """
141    )
142    kernel_dirs = List(
143        help="List of kernel directories to search. Later ones take priority over earlier."
144    )
145    def _kernel_dirs_default(self):
146        dirs = jupyter_path('kernels')
147        # At some point, we should stop adding .ipython/kernels to the path,
148        # but the cost to keeping it is very small.
149        try:
150            from IPython.paths import get_ipython_dir
151        except ImportError:
152            try:
153                from IPython.utils.path import get_ipython_dir
154            except ImportError:
155                # no IPython, no ipython dir
156                get_ipython_dir = None
157        if get_ipython_dir is not None:
158            dirs.append(os.path.join(get_ipython_dir(), 'kernels'))
159        return dirs
160
161    def find_kernel_specs(self):
162        """Returns a dict mapping kernel names to resource directories."""
163        d = {}
164        for kernel_dir in self.kernel_dirs:
165            kernels = _list_kernels_in(kernel_dir)
166            for kname, spec in kernels.items():
167                if kname not in d:
168                    self.log.debug("Found kernel %s in %s", kname, kernel_dir)
169                    d[kname] = spec
170
171        if self.ensure_native_kernel and NATIVE_KERNEL_NAME not in d:
172            try:
173                from ipykernel.kernelspec import RESOURCES
174                self.log.debug("Native kernel (%s) available from %s",
175                               NATIVE_KERNEL_NAME, RESOURCES)
176                d[NATIVE_KERNEL_NAME] = RESOURCES
177            except ImportError:
178                self.log.warning("Native kernel (%s) is not available", NATIVE_KERNEL_NAME)
179
180        if self.whitelist:
181            # filter if there's a whitelist
182            d = {name:spec for name,spec in d.items() if name in self.whitelist}
183        return d
184        # TODO: Caching?
185
186    def _get_kernel_spec_by_name(self, kernel_name, resource_dir):
187        """ Returns a :class:`KernelSpec` instance for a given kernel_name
188        and resource_dir.
189        """
190        if kernel_name == NATIVE_KERNEL_NAME:
191            try:
192                from ipykernel.kernelspec import RESOURCES, get_kernel_dict
193            except ImportError:
194                # It should be impossible to reach this, but let's play it safe
195                pass
196            else:
197                if resource_dir == RESOURCES:
198                    return self.kernel_spec_class(resource_dir=resource_dir, **get_kernel_dict())
199
200        return self.kernel_spec_class.from_resource_dir(resource_dir)
201
202    def _find_spec_directory(self, kernel_name):
203        """Find the resource directory of a named kernel spec"""
204        for kernel_dir in [kd for kd in self.kernel_dirs if os.path.isdir(kd)]:
205            files = os.listdir(kernel_dir)
206            for f in files:
207                path = pjoin(kernel_dir, f)
208                if f.lower() == kernel_name and _is_kernel_dir(path):
209                    return path
210
211        if kernel_name == NATIVE_KERNEL_NAME:
212            try:
213                from ipykernel.kernelspec import RESOURCES
214            except ImportError:
215                pass
216            else:
217                return RESOURCES
218
219    def get_kernel_spec(self, kernel_name):
220        """Returns a :class:`KernelSpec` instance for the given kernel_name.
221
222        Raises :exc:`NoSuchKernel` if the given kernel name is not found.
223        """
224        if not _is_valid_kernel_name(kernel_name):
225            self.log.warning("Kernelspec name %r is invalid: %s", kernel_name,
226                             _kernel_name_description)
227
228        resource_dir = self._find_spec_directory(kernel_name.lower())
229        if resource_dir is None:
230            raise NoSuchKernel(kernel_name)
231
232        return self._get_kernel_spec_by_name(kernel_name, resource_dir)
233
234    def get_all_specs(self):
235        """Returns a dict mapping kernel names to kernelspecs.
236
237        Returns a dict of the form::
238
239            {
240              'kernel_name': {
241                'resource_dir': '/path/to/kernel_name',
242                'spec': {"the spec itself": ...}
243              },
244              ...
245            }
246        """
247        d = self.find_kernel_specs()
248        res = {}
249        for kname, resource_dir in d.items():
250            try:
251                if self.__class__ is KernelSpecManager:
252                    spec = self._get_kernel_spec_by_name(kname, resource_dir)
253                else:
254                    # avoid calling private methods in subclasses,
255                    # which may have overridden find_kernel_specs
256                    # and get_kernel_spec, but not the newer get_all_specs
257                    spec = self.get_kernel_spec(kname)
258
259                res[kname] = {
260                    "resource_dir": resource_dir,
261                    "spec": spec.to_dict()
262                }
263            except Exception:
264                self.log.warning("Error loading kernelspec %r", kname, exc_info=True)
265        return res
266
267    def remove_kernel_spec(self, name):
268        """Remove a kernel spec directory by name.
269
270        Returns the path that was deleted.
271        """
272        save_native = self.ensure_native_kernel
273        try:
274            self.ensure_native_kernel = False
275            specs = self.find_kernel_specs()
276        finally:
277            self.ensure_native_kernel = save_native
278        spec_dir = specs[name]
279        self.log.debug("Removing %s", spec_dir)
280        if os.path.islink(spec_dir):
281            os.remove(spec_dir)
282        else:
283            shutil.rmtree(spec_dir)
284        return spec_dir
285
286    def _get_destination_dir(self, kernel_name, user=False, prefix=None):
287        if user:
288            return os.path.join(self.user_kernel_dir, kernel_name)
289        elif prefix:
290            return os.path.join(os.path.abspath(prefix), 'share', 'jupyter', 'kernels', kernel_name)
291        else:
292            return os.path.join(SYSTEM_JUPYTER_PATH[0], 'kernels', kernel_name)
293
294
295    def install_kernel_spec(self, source_dir, kernel_name=None, user=False,
296                            replace=None, prefix=None):
297        """Install a kernel spec by copying its directory.
298
299        If ``kernel_name`` is not given, the basename of ``source_dir`` will
300        be used.
301
302        If ``user`` is False, it will attempt to install into the systemwide
303        kernel registry. If the process does not have appropriate permissions,
304        an :exc:`OSError` will be raised.
305
306        If ``prefix`` is given, the kernelspec will be installed to
307        PREFIX/share/jupyter/kernels/KERNEL_NAME. This can be sys.prefix
308        for installation inside virtual or conda envs.
309        """
310        source_dir = source_dir.rstrip('/\\')
311        if not kernel_name:
312            kernel_name = os.path.basename(source_dir)
313        kernel_name = kernel_name.lower()
314        if not _is_valid_kernel_name(kernel_name):
315            raise ValueError("Invalid kernel name %r.  %s" % (kernel_name, _kernel_name_description))
316
317        if user and prefix:
318            raise ValueError("Can't specify both user and prefix. Please choose one or the other.")
319
320        if replace is not None:
321            warnings.warn(
322                "replace is ignored. Installing a kernelspec always replaces an existing installation",
323                DeprecationWarning,
324                stacklevel=2,
325            )
326
327        destination = self._get_destination_dir(kernel_name, user=user, prefix=prefix)
328        self.log.debug('Installing kernelspec in %s', destination)
329
330        kernel_dir = os.path.dirname(destination)
331        if kernel_dir not in self.kernel_dirs:
332            self.log.warning("Installing to %s, which is not in %s. The kernelspec may not be found.",
333                kernel_dir, self.kernel_dirs,
334            )
335
336        if os.path.isdir(destination):
337            self.log.info('Removing existing kernelspec in %s', destination)
338            shutil.rmtree(destination)
339
340        shutil.copytree(source_dir, destination)
341        self.log.info('Installed kernelspec %s in %s', kernel_name, destination)
342        return destination
343
344    def install_native_kernel_spec(self, user=False):
345        """DEPRECATED: Use ipykernel.kernelspec.install"""
346        warnings.warn("install_native_kernel_spec is deprecated."
347            " Use ipykernel.kernelspec import install.", stacklevel=2)
348        from ipykernel.kernelspec import install
349        install(self, user=user)
350
351
352def find_kernel_specs():
353    """Returns a dict mapping kernel names to resource directories."""
354    return KernelSpecManager().find_kernel_specs()
355
356def get_kernel_spec(kernel_name):
357    """Returns a :class:`KernelSpec` instance for the given kernel_name.
358
359    Raises KeyError if the given kernel name is not found.
360    """
361    return KernelSpecManager().get_kernel_spec(kernel_name)
362
363def install_kernel_spec(source_dir, kernel_name=None, user=False, replace=False,
364                        prefix=None):
365    return KernelSpecManager().install_kernel_spec(source_dir, kernel_name,
366                                                    user, replace, prefix)
367
368install_kernel_spec.__doc__ = KernelSpecManager.install_kernel_spec.__doc__
369
370def install_native_kernel_spec(user=False):
371    return KernelSpecManager().install_native_kernel_spec(user=user)
372
373install_native_kernel_spec.__doc__ = KernelSpecManager.install_native_kernel_spec.__doc__
374