1# Copyright (C) 2015-2021 Regents of the University of California
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""
15Batch system for running Toil workflows on Kubernetes.
16
17Ony useful with network-based job stores, like AWSJobStore.
18
19Within non-priveleged Kubernetes containers, additional Docker containers
20cannot yet be launched. That functionality will need to wait for user-mode
21Docker
22"""
23import base64
24import datetime
25import getpass
26import logging
27import os
28import pickle
29import string
30import subprocess
31import sys
32import tempfile
33import time
34import uuid
35
36import kubernetes
37import pytz
38import urllib3
39from kubernetes.client.rest import ApiException
40
41from toil import applianceSelf
42from toil.batchSystems.abstractBatchSystem import (EXIT_STATUS_UNAVAILABLE_VALUE,
43                                                   BatchJobExitReason,
44                                                   BatchSystemCleanupSupport,
45                                                   UpdatedBatchJobInfo)
46from toil.common import Toil
47from toil.job import JobDescription
48from toil.lib.conversions import human2bytes
49from toil.lib.retry import ErrorCondition, retry
50from toil.resource import Resource
51from toil.statsAndLogging import configure_root_logger, set_log_level
52
53logger = logging.getLogger(__name__)
54retryable_kubernetes_errors = [urllib3.exceptions.MaxRetryError,
55                               urllib3.exceptions.ProtocolError,
56                               ApiException]
57
58
59def is_retryable_kubernetes_error(e):
60    """
61    A function that determines whether or not Toil should retry or stop given
62    exceptions thrown by Kubernetes.
63    """
64    for error in retryable_kubernetes_errors:
65        if isinstance(e, error):
66            return True
67    return False
68
69
70def slow_down(seconds):
71    """
72    Toil jobs that have completed are not allowed to have taken 0 seconds, but
73    Kubernetes timestamps round things to the nearest second. It is possible in Kubernetes for
74    a pod to have identical start and end timestamps.
75
76    This function takes a possibly 0 job length in seconds and enforces a minimum length to satisfy Toil.
77
78    :param float seconds: Kubernetes timestamp difference
79
80    :return: seconds, or a small positive number if seconds is 0
81    :rtype: float
82    """
83
84    return max(seconds, sys.float_info.epsilon)
85
86
87def utc_now():
88    """Return a datetime in the UTC timezone corresponding to right now."""
89    return datetime.datetime.utcnow().replace(tzinfo=pytz.UTC)
90
91
92class KubernetesBatchSystem(BatchSystemCleanupSupport):
93    @classmethod
94    def supportsAutoDeployment(cls):
95        return True
96
97    def __init__(self, config, maxCores, maxMemory, maxDisk):
98        super(KubernetesBatchSystem, self).__init__(config, maxCores, maxMemory, maxDisk)
99
100        # Turn down log level for Kubernetes modules and dependencies.
101        # Otherwise if we are at debug log level, we dump every
102        # request/response to Kubernetes, including tokens which we shouldn't
103        # reveal on CI.
104        logging.getLogger('kubernetes').setLevel(logging.ERROR)
105        logging.getLogger('requests_oauthlib').setLevel(logging.ERROR)
106
107        # This will hold the last time our Kubernetes credentials were refreshed
108        self.credential_time = None
109        # And this will hold our cache of API objects
110        self._apis = {}
111
112        # Get our namespace (and our Kubernetes credentials to make sure they exist)
113        self.namespace = self._api('namespace')
114
115        # Decide if we are going to mount a Kubernetes host path as /tmp in the workers.
116        # If we do this and the work dir is the default of the temp dir, caches will be shared.
117        self.host_path = config.kubernetesHostPath
118        if self.host_path is None and os.environ.get("TOIL_KUBERNETES_HOST_PATH", None) is not None:
119            # We can also take it from an environment variable
120            self.host_path = os.environ.get("TOIL_KUBERNETES_HOST_PATH")
121
122        # Make a Kubernetes-acceptable version of our username: not too long,
123        # and all lowercase letters, numbers, or - or .
124        acceptableChars = set(string.ascii_lowercase + string.digits + '-.')
125
126        # Use TOIL_KUBERNETES_OWNER if present in env var
127        if os.environ.get("TOIL_KUBERNETES_OWNER", None) is not None:
128            username = os.environ.get("TOIL_KUBERNETES_OWNER")
129        else:
130            username = ''.join([c for c in getpass.getuser().lower() if c in acceptableChars])[:100]
131
132        self.uniqueID = uuid.uuid4()
133
134        # Create a prefix for jobs, starting with our username
135        self.jobPrefix = '{}-toil-{}-'.format(username, self.uniqueID)
136
137        # Instead of letting Kubernetes assign unique job names, we assign our
138        # own based on a numerical job ID. This functionality is managed by the
139        # BatchSystemLocalSupport.
140
141        # Here is where we will store the user script resource object if we get one.
142        self.userScript = None
143
144        # Ge the image to deploy from Toil's configuration
145        self.dockerImage = applianceSelf()
146
147        # Try and guess what Toil work dir the workers will use.
148        # We need to be able to provision (possibly shared) space there.
149        self.workerWorkDir = Toil.getToilWorkDir(config.workDir)
150        if (config.workDir is None and
151            os.getenv('TOIL_WORKDIR') is None and
152            self.workerWorkDir == tempfile.gettempdir()):
153
154            # We defaulted to the system temp directory. But we think the
155            # worker Dockerfiles will make them use /var/lib/toil instead.
156            # TODO: Keep this in sync with the Dockerfile.
157            self.workerWorkDir = '/var/lib/toil'
158
159        # Get the name of the AWS secret, if any, to mount in containers.
160        # TODO: have some way to specify this (env var?)!
161        self.awsSecretName = os.environ.get("TOIL_AWS_SECRET_NAME", None)
162
163        # Set this to True to enable the experimental wait-for-job-update code
164        # TODO: Make this an environment variable?
165        self.enableWatching = os.environ.get("KUBE_WATCH_ENABLED", False)
166
167        self.runID = 'toil-{}'.format(self.uniqueID)
168
169        self.jobIds = set()
170
171
172    def _api(self, kind, max_age_seconds = 5 * 60):
173        """
174        The Kubernetes module isn't clever enough to renew its credentials when
175        they are about to expire. See
176        https://github.com/kubernetes-client/python/issues/741.
177
178        We work around this by making sure that every time we are about to talk
179        to Kubernetes, we have fresh credentials. And we do that by reloading
180        the config and replacing our Kubernetes API objects before we do any
181        Kubernetes things.
182
183        TODO: We can still get in trouble if a single watch or listing loop
184        goes on longer than our credentials last, though.
185
186        This method is the Right Way to get any Kubernetes API. You call it
187        with the API you want ('batch', 'core', or 'customObjects') and it
188        returns an API object with guaranteed fresh credentials.
189
190        It also recognizes 'namespace' and returns our namespace as a string.
191
192        max_age_seconds needs to be << your cluster's credential expiry time.
193        """
194
195        now = utc_now()
196
197        if self.credential_time is None or (now - self.credential_time).total_seconds() > max_age_seconds:
198            # Credentials need a refresh
199            try:
200                # Load ~/.kube/config or KUBECONFIG
201                kubernetes.config.load_kube_config()
202                # Worked. We're using kube config
203                config_source = 'kube'
204            except kubernetes.config.ConfigException:
205                # Didn't work. Try pod-based credentials in case we are in a pod.
206                try:
207                    kubernetes.config.load_incluster_config()
208                    # Worked. We're using in_cluster config
209                    config_source = 'in_cluster'
210                except kubernetes.config.ConfigException:
211                    raise RuntimeError('Could not load Kubernetes configuration from ~/.kube/config, $KUBECONFIG, or current pod.')
212
213            # Now fill in the API objects with these credentials
214            self._apis['batch'] = kubernetes.client.BatchV1Api()
215            self._apis['core'] = kubernetes.client.CoreV1Api()
216            self._apis['customObjects'] = kubernetes.client.CustomObjectsApi()
217
218            # And save the time
219            self.credential_time = now
220        if kind == 'namespace':
221            # We just need the namespace string
222            if config_source == 'in_cluster':
223                # Our namespace comes from a particular file.
224                with open("/var/run/secrets/kubernetes.io/serviceaccount/namespace", 'r') as fh:
225                    return fh.read().strip()
226            else:
227                # Find all contexts and the active context.
228                # The active context gets us our namespace.
229                contexts, activeContext = kubernetes.config.list_kube_config_contexts()
230                if not contexts:
231                    raise RuntimeError("No Kubernetes contexts available in ~/.kube/config or $KUBECONFIG")
232
233                # Identify the namespace to work in
234                return activeContext.get('context', {}).get('namespace', 'default')
235
236        else:
237            # We need an API object
238            try:
239                return self._apis[kind]
240            except KeyError:
241                raise RuntimeError("Unknown Kubernetes API type: {}".format(kind))
242
243    @retry(errors=retryable_kubernetes_errors)
244    def _try_kubernetes(self, method, *args, **kwargs):
245        """
246        Kubernetes API can end abruptly and fail when it could dynamically backoff and retry.
247
248        For example, calling self._api('batch').create_namespaced_job(self.namespace, job),
249        Kubernetes can behave inconsistently and fail given a large job. See
250        https://github.com/DataBiosphere/toil/issues/2884.
251
252        This function gives Kubernetes more time to try an executable api.
253        """
254        return method(*args, **kwargs)
255
256    @retry(errors=retryable_kubernetes_errors + [
257               ErrorCondition(
258                   error=ApiException,
259                   error_codes=[404],
260                   retry_on_this_condition=False
261               )])
262    def _try_kubernetes_expecting_gone(self, method, *args, **kwargs):
263        """
264        Same as _try_kubernetes, but raises 404 errors as soon as they are
265        encountered (because we are waiting for them) instead of retrying on
266        them.
267        """
268        return method(*args, **kwargs)
269
270    def _try_kubernetes_stream(self, method, *args, **kwargs):
271        """
272        Kubernetes kubernetes.watch.Watch().stream() streams can fail and raise
273        errors. We don't want to have those errors fail the entire workflow, so
274        we handle them here.
275
276        When you want to stream the results of a Kubernetes API method, call
277        this instead of stream().
278
279        To avoid having to do our own timeout logic, we finish the watch early
280        if it produces an error.
281        """
282
283        w = kubernetes.watch.Watch()
284
285        # We will set this to bypass our second catch in the case of user errors.
286        userError = False
287
288        try:
289            for item in w.stream(method, *args, **kwargs):
290                # For everything the watch stream gives us
291                try:
292                    # Show the item to user code
293                    yield item
294                except Exception as e:
295                    # If we get an error from user code, skip our catch around
296                    # the Kubernetes generator.
297                    userError = True
298                    raise
299        except Exception as e:
300            # If we get an error
301            if userError:
302                # It wasn't from the Kubernetes watch generator. Pass it along.
303                raise
304            else:
305                # It was from the Kubernetes watch generator we manage.
306                if is_retryable_kubernetes_error(e):
307                    # This is just cloud weather.
308                    # TODO: We will also get an APIError if we just can't code good against Kubernetes. So make sure to warn.
309                    logger.warning("Received error from Kubernetes watch stream: %s", e)
310                    # Just end the watch.
311                    return
312                else:
313                    # Something actually weird is happening.
314                    raise
315
316
317    def setUserScript(self, userScript):
318        logger.info('Setting user script for deployment: {}'.format(userScript))
319        self.userScript = userScript
320
321    # setEnv is provided by BatchSystemSupport, updates self.environment
322
323    def _create_affinity(self, preemptable: bool) -> kubernetes.client.V1Affinity:
324        """
325        Make a V1Affinity that places pods appropriately depending on if they
326        tolerate preemptable nodes or not.
327        """
328
329        # Describe preemptable nodes
330
331        # There's no labeling standard for knowing which nodes are
332        # preemptable across different cloud providers/Kubernetes clusters,
333        # so we use the labels that EKS uses. Toil-managed Kubernetes
334        # clusters also use this label. If we come to support more kinds of
335        # preemptable nodes, we will need to add more labels to avoid here.
336        preemptable_label = "eks.amazonaws.com/capacityType"
337        preemptable_value = "SPOT"
338
339        non_spot = [kubernetes.client.V1NodeSelectorRequirement(key=preemptable_label,
340                                                                operator='NotIn',
341                                                                values=[preemptable_value])]
342        unspecified = [kubernetes.client.V1NodeSelectorRequirement(key=preemptable_label,
343                                                                   operator='DoesNotExist')]
344        # These are OR'd
345        node_selector_terms = [kubernetes.client.V1NodeSelectorTerm(match_expressions=non_spot),
346                               kubernetes.client.V1NodeSelectorTerm(match_expressions=unspecified)]
347        node_selector = kubernetes.client.V1NodeSelector(node_selector_terms=node_selector_terms)
348
349
350        if preemptable:
351            # We can put this job anywhere. But we would be smart to prefer
352            # preemptable nodes first, if available, so we don't block any
353            # non-preemptable jobs.
354            node_preference = kubernetes.client.V1PreferredSchedulingTerm(weight=1, preference=node_selector)
355
356            node_affinity = kubernetes.client.V1NodeAffinity(preferred_during_scheduling_ignored_during_execution=[node_preference])
357        else:
358            # We need to add some selector stuff to keep the job off of
359            # nodes that might be preempted.
360            node_affinity = kubernetes.client.V1NodeAffinity(required_during_scheduling_ignored_during_execution=node_selector)
361
362        # Make the node affinity into an overall affinity
363        return kubernetes.client.V1Affinity(node_affinity=node_affinity)
364
365    def _create_pod_spec(self, jobDesc: JobDescription) -> kubernetes.client.V1PodSpec:
366        """
367        Make the specification for a pod that can execute the given job.
368        """
369
370        # Make a job dict to send to the executor.
371        # First just wrap the command and the environment to run it in
372        job = {'command': jobDesc.command,
373               'environment': self.environment.copy()}
374        # TODO: query customDockerInitCmd to respect TOIL_CUSTOM_DOCKER_INIT_COMMAND
375
376        if self.userScript is not None:
377            # If there's a user script resource be sure to send it along
378            job['userScript'] = self.userScript
379
380        # Encode it in a form we can send in a command-line argument. Pickle in
381        # the highest protocol to prevent mixed-Python-version workflows from
382        # trying to work. Make sure it is text so we can ship it to Kubernetes
383        # via JSON.
384        encodedJob = base64.b64encode(pickle.dumps(job, pickle.HIGHEST_PROTOCOL)).decode('utf-8')
385
386        # The Kubernetes API makes sense only in terms of the YAML format. Objects
387        # represent sections of the YAML files. Except from our point of view, all
388        # the internal nodes in the YAML structure are named and typed.
389
390        # For docs, start at the root of the job hierarchy:
391        # https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1Job.md
392
393        # Make a definition for the container's resource requirements.
394        # Add on a bit for Kubernetes overhead (Toil worker's memory, hot deployed
395        # user scripts).
396        # Kubernetes needs some lower limit of memory to run the pod at all without
397        # OOMing. We also want to provision some extra space so that when
398        # we test _isPodStuckOOM we never get True unless the job has
399        # exceeded jobDesc.memory.
400        requirements_dict = {'cpu': jobDesc.cores,
401                             'memory': jobDesc.memory + 1024 * 1024 * 512,
402                             'ephemeral-storage': jobDesc.disk + 1024 * 1024 * 512}
403        # Use the requirements as the limits, for predictable behavior, and because
404        # the UCSC Kubernetes admins want it that way.
405        limits_dict = requirements_dict
406        resources = kubernetes.client.V1ResourceRequirements(limits=limits_dict,
407                                                             requests=requirements_dict)
408
409        # Collect volumes and mounts
410        volumes = []
411        mounts = []
412
413        if self.host_path is not None:
414            # Provision Toil WorkDir from a HostPath volume, to share with other pods
415            host_path_volume_name = 'workdir'
416            # Use type='Directory' to fail if the host directory doesn't exist already.
417            host_path_volume_source = kubernetes.client.V1HostPathVolumeSource(path=self.host_path, type='Directory')
418            host_path_volume = kubernetes.client.V1Volume(name=host_path_volume_name,
419                                                         host_path=host_path_volume_source)
420            volumes.append(host_path_volume)
421            host_path_volume_mount = kubernetes.client.V1VolumeMount(mount_path=self.workerWorkDir, name=host_path_volume_name)
422            mounts.append(host_path_volume_mount)
423        else:
424            # Provision Toil WorkDir as an ephemeral volume
425            ephemeral_volume_name = 'workdir'
426            ephemeral_volume_source = kubernetes.client.V1EmptyDirVolumeSource()
427            ephemeral_volume = kubernetes.client.V1Volume(name=ephemeral_volume_name,
428                                                          empty_dir=ephemeral_volume_source)
429            volumes.append(ephemeral_volume)
430            ephemeral_volume_mount = kubernetes.client.V1VolumeMount(mount_path=self.workerWorkDir, name=ephemeral_volume_name)
431            mounts.append(ephemeral_volume_mount)
432
433        if self.awsSecretName is not None:
434            # Also mount an AWS secret, if provided.
435            # TODO: make this generic somehow
436            secret_volume_name = 's3-credentials'
437            secret_volume_source = kubernetes.client.V1SecretVolumeSource(secret_name=self.awsSecretName)
438            secret_volume = kubernetes.client.V1Volume(name=secret_volume_name,
439                                                       secret=secret_volume_source)
440            volumes.append(secret_volume)
441            secret_volume_mount = kubernetes.client.V1VolumeMount(mount_path='/root/.aws', name=secret_volume_name)
442            mounts.append(secret_volume_mount)
443
444        # Make a container definition
445        container = kubernetes.client.V1Container(command=['_toil_kubernetes_executor', encodedJob],
446                                                  image=self.dockerImage,
447                                                  name="runner-container",
448                                                  resources=resources,
449                                                  volume_mounts=mounts)
450        # Wrap the container in a spec
451        pod_spec = kubernetes.client.V1PodSpec(containers=[container],
452                                               volumes=volumes,
453                                               restart_policy="Never")
454        # Tell the spec where to land
455        pod_spec.affinity = self._create_affinity(jobDesc.preemptable)
456
457        return pod_spec
458
459
460    def issueBatchJob(self, jobDesc):
461        # TODO: get a sensible self.maxCores, etc. so we can checkResourceRequest.
462        # How do we know if the cluster will autoscale?
463
464        # Try the job as local
465        localID = self.handleLocalJob(jobDesc)
466        if localID:
467            # It is a local job
468            return localID
469        else:
470            # We actually want to send to the cluster
471
472            # Check resource requirements (managed by BatchSystemSupport)
473            self.checkResourceRequest(jobDesc.memory, jobDesc.cores, jobDesc.disk)
474
475            # Make a pod that describes running the job
476            pod_spec = self._create_pod_spec(jobDesc)
477
478            # Make a batch system scope job ID
479            jobID = self.getNextJobID()
480            # Make a unique name
481            jobName = self.jobPrefix + str(jobID)
482
483            # Make metadata to label the job/pod with info.
484            # Don't let the cluster autoscaler evict any Toil jobs.
485            metadata = kubernetes.client.V1ObjectMeta(name=jobName,
486                                                      labels={"toil_run": self.runID},
487                                                      annotations={"cluster-autoscaler.kubernetes.io/safe-to-evict": "false"})
488
489            # Wrap the spec in a template
490            template = kubernetes.client.V1PodTemplateSpec(spec=pod_spec, metadata=metadata)
491
492            # Make another spec for the job, asking to run the template with no backoff
493            job_spec = kubernetes.client.V1JobSpec(template=template, backoff_limit=0)
494
495            # And make the actual job
496            job = kubernetes.client.V1Job(spec=job_spec,
497                                          metadata=metadata,
498                                          api_version="batch/v1",
499                                          kind="Job")
500
501            # Make the job
502            launched = self._try_kubernetes(self._api('batch').create_namespaced_job, self.namespace, job)
503
504            logger.debug('Launched job: %s', jobName)
505
506            return jobID
507
508    def _ourJobObject(self, onlySucceeded=False):
509        """
510        Yield Kubernetes V1Job objects that we are responsible for that the
511        cluster knows about.
512
513        Doesn't support a free-form selector, because there's only about 3
514        things jobs can be selected on: https://stackoverflow.com/a/55808444
515
516        :param bool onlySucceeded: restrict results to succeeded jobs.
517        :param int limit: max results to yield.
518        """
519
520        # We need to page through the list from the cluster with a continuation
521        # token. These expire after about 5 minutes. If we use an expired one,
522        # we get a 410 error and a new token, and we can use the new token to
523        # get the rest of the list, but the list will be updated.
524        #
525        # TODO: How to get the new token isn't clear. See
526        # https://github.com/kubernetes-client/python/issues/953. For now we
527        # will just throw an error if we don't get to the end of the list in
528        # time.
529
530        token = None
531
532        while True:
533            # We can't just pass e.g. a None continue token when there isn't
534            # one, because the Kubernetes module reads its kwargs dict and
535            # cares about presence/absence. So we build a dict to send.
536            kwargs = {}
537
538            if token is not None:
539                kwargs['_continue'] = token
540
541            if onlySucceeded:
542                results =  self._try_kubernetes(self._api('batch').list_namespaced_job, self.namespace,
543                                                label_selector="toil_run={}".format(self.runID), field_selector="status.successful==1", **kwargs)
544            else:
545                results = self._try_kubernetes(self._api('batch').list_namespaced_job, self.namespace,
546                                                label_selector="toil_run={}".format(self.runID), **kwargs)
547            for job in results.items:
548                # This job belongs to us
549                yield job
550
551            # Remember the continuation token, if any
552            token = getattr(results.metadata, 'continue', None)
553
554            if token is None:
555                # There isn't one. We got everything.
556                break
557
558
559    def _ourPodObject(self):
560        """
561        Yield Kubernetes V1Pod objects that we are responsible for that the
562        cluster knows about.
563        """
564
565        token = None
566
567        while True:
568            # We can't just pass e.g. a None continue token when there isn't
569            # one, because the Kubernetes module reads its kwargs dict and
570            # cares about presence/absence. So we build a dict to send.
571            kwargs = {}
572
573            if token is not None:
574                kwargs['_continue'] = token
575
576            results = self._try_kubernetes(self._api('core').list_namespaced_pod, self.namespace, label_selector="toil_run={}".format(self.runID), **kwargs)
577
578            for pod in results.items:
579                yield pod
580            # Remember the continuation token, if any
581            token = getattr(results.metadata, 'continue', None)
582
583            if token is None:
584                # There isn't one. We got everything.
585                break
586
587
588    def _getPodForJob(self, jobObject):
589        """
590        Get the pod that belongs to the given job, or None if the job's pod is
591        missing. The pod knows about things like the job's exit code.
592
593        :param kubernetes.client.V1Job jobObject: a Kubernetes job to look up
594                                       pods for.
595
596        :return: The pod for the job, or None if no pod is found.
597        :rtype: kubernetes.client.V1Pod
598        """
599
600        token = None
601
602        # Work out what the return code was (which we need to get from the
603        # pods) We get the associated pods by querying on the label selector
604        # `job-name=JOBNAME`
605        query = 'job-name={}'.format(jobObject.metadata.name)
606
607        while True:
608            # We can't just pass e.g. a None continue token when there isn't
609            # one, because the Kubernetes module reads its kwargs dict and
610            # cares about presence/absence. So we build a dict to send.
611            kwargs = {'label_selector': query}
612            if token is not None:
613                kwargs['_continue'] = token
614            results = self._try_kubernetes(self._api('core').list_namespaced_pod, self.namespace, **kwargs)
615
616            for pod in results.items:
617                # Return the first pod we find
618                return pod
619
620            # Remember the continuation token, if any
621            token = getattr(results.metadata, 'continue', None)
622
623            if token is None:
624                # There isn't one. We got everything.
625                break
626
627        # If we get here, no pages had any pods.
628        return None
629
630    def _getLogForPod(self, podObject):
631        """
632        Get the log for a pod.
633
634        :param kubernetes.client.V1Pod podObject: a Kubernetes pod with one
635                                       container to get the log from.
636
637        :return: The log for the only container in the pod.
638        :rtype: str
639
640        """
641
642        return self._try_kubernetes(self._api('core').read_namespaced_pod_log, podObject.metadata.name,
643                                                         namespace=self.namespace)
644
645    def _isPodStuckOOM(self, podObject, minFreeBytes=1024 * 1024 * 2):
646        """
647        Poll the current memory usage for the pod from the cluster.
648
649        Return True if the pod looks to be in a soft/stuck out of memory (OOM)
650        state, where it is using too much memory to actually make progress, but
651        not enough to actually trigger the OOM killer to kill it. For some
652        large memory limits, on some Kubernetes clusters, pods can get stuck in
653        this state when their memory limits are high (approx. 200 Gi).
654
655        We operationalize "OOM" as having fewer than minFreeBytes bytes free.
656
657        We assume the pod has only one container, as Toil's pods do.
658
659        If the metrics service is not working, we treat the pod as not being
660        stuck OOM. Otherwise, we would kill all functioning jobs on clusters
661        where the metrics service is down or isn't installed.
662
663        :param kubernetes.client.V1Pod podObject: a Kubernetes pod with one
664                                       container to check up on.
665        :param int minFreeBytes: Minimum free bytes to not be OOM.
666
667        :return: True if the pod is OOM, False otherwise.
668        :rtype: bool
669        """
670
671        # Compose a query to get just the pod we care about
672        query = 'metadata.name=' + podObject.metadata.name
673
674        # Look for it, but manage our own exceptions
675        try:
676            # TODO: When the Kubernetes Python API actually wraps the metrics API, switch to that
677            response = self._api('customObjects').list_namespaced_custom_object('metrics.k8s.io', 'v1beta1',
678                                                                                self.namespace, 'pods',
679                                                                                field_selector=query)
680        except Exception as e:
681            # We couldn't talk to the metrics service on this attempt. We don't
682            # retry, but we also don't want to just ignore all errors. We only
683            # want to ignore errors we expect to see if the problem is that the
684            # metrics service is not working.
685            if type(e) in retryable_kubernetes_errors:
686                # This is the sort of error we would expect from an overloaded
687                # Kubernetes or a dead metrics service.
688                # We can't tell that the pod is stuck, so say that it isn't.
689                logger.warning("Could not query metrics service: %s", e)
690                return False
691            else:
692                raise
693
694        # Pull out the items
695        items = response.get('items', [])
696
697        if len(items) == 0:
698            # If there's no statistics we can't say we're stuck OOM
699            return False
700
701        # Assume the first result is the right one, because of the selector
702        # Assume it has exactly one pod, because we made it
703        containers = items[0].get('containers', [{}])
704
705        if len(containers) == 0:
706            # If there are no containers (because none have started yet?), we can't say we're stuck OOM
707            return False
708
709        # Otherwise, assume it just has one container.
710        # Grab the memory usage string, like 123Ki, and convert to bytes.
711        # If anything is missing, assume 0 bytes used.
712        bytesUsed = human2bytes(containers[0].get('usage', {}).get('memory', '0'))
713
714        # Also get the limit out of the pod object's spec
715        bytesAllowed = human2bytes(podObject.spec.containers[0].resources.limits['memory'])
716
717        if bytesAllowed - bytesUsed < minFreeBytes:
718            # This is too much!
719            logger.warning('Pod %s has used %d of %d bytes of memory; reporting as stuck due to OOM.',
720                           podObject.metadata.name, bytesUsed, bytesAllowed)
721
722            return True
723
724
725
726
727    def _getIDForOurJob(self, jobObject):
728        """
729        Get the JobID number that belongs to the given job that we own.
730
731        :param kubernetes.client.V1Job jobObject: a Kubernetes job object that is a job we issued.
732
733        :return: The JobID for the job.
734        :rtype: int
735        """
736
737        return int(jobObject.metadata.name[len(self.jobPrefix):])
738
739
740    def getUpdatedBatchJob(self, maxWait):
741
742        entry = datetime.datetime.now()
743
744        result = self._getUpdatedBatchJobImmediately()
745
746        if result is not None or maxWait == 0:
747            # We got something on the first try, or we only get one try
748            return result
749
750        # Otherwise we need to maybe wait.
751        if self.enableWatching:
752            for event in self._try_kubernetes_stream(self._api('batch').list_namespaced_job, self.namespace,
753                                                        label_selector="toil_run={}".format(self.runID),
754                                                        timeout_seconds=maxWait):
755                # Grab the metadata data, ID, the list of conditions of the current job, and the total pods
756                jobObject = event['object']
757                jobID = int(jobObject.metadata.name[len(self.jobPrefix):])
758                jobObjectListConditions =jobObject.status.conditions
759                totalPods = jobObject.status.active + jobObject.status.finished + jobObject.status.failed
760                # Exit Reason defaults to 'Successfully Finsihed` unless said otherwise
761                exitReason = BatchJobExitReason.FINISHED
762                exitCode = 0
763
764                # Check if there are any active pods
765                if jobObject.status.acitve > 0:
766                    logger.info("%s has %d pods running" % jobObject.metadata.name, jobObject.status.active)
767                    continue
768                elif jobObject.status.failed > 0 or jobObject.status.finished > 0:
769                    # No more active pods in the current job ; must be finished
770                    logger.info("%s RESULTS -> Succeeded: %d Failed:%d Active:%d" % jobObject.metadata.name,
771                                                                jobObject.status.succeeded, jobObject.status.failed, jobObject.status.active)
772                    # Get termination information of job
773                    termination = jobObjectListConditions[0]
774                    # Log out succeess/failure given a reason
775                    logger.info("%s REASON: %s", termination.type, termination.reason)
776
777                    # Log out reason of failure and pod exit code
778                    if jobObject.status.failed > 0:
779                        exitReason = BatchJobExitReason.FAILED
780                        pod = self._getPodForJob(jobObject)
781                        logger.debug("Failed job %s", str(jobObject))
782                        logger.warning("Failed Job Message: %s", termination.message)
783                        exitCode = pod.status.container_statuses[0].state.terminated.exit_code
784
785                    runtime = slow_down((termination.completion_time - termination.start_time).total_seconds())
786                    result = UpdatedBatchJobInfo(jobID=jobID, exitStatus=exitCode, wallTime=runtime, exitReason=exitReason)
787
788                    if (exitReason == BatchJobExitReason.FAILED) or (jobObject.status.finished == totalPods):
789                        # Cleanup if job is all finished or there was a pod that failed
790                        self._try_kubernetes(self._api('batch').delete_namespaced_job,
791                                            jobObject.metadata.name,
792                                            self.namespace,
793                                            propagation_policy='Foreground')
794                        self._waitForJobDeath(jobObject.metadata.name)
795                        return result
796                    continue
797                else:
798                    # Job is not running/updating ; no active, successful, or failed pods yet
799                    logger.debug("Job %s -> %s" % (jobObject.metadata.name, jobObjectListConditions[0].reason))
800                    # Pod could be pending; don't say it's lost.
801                    continue
802        else:
803            # Try polling instead
804            while result is None and (datetime.datetime.now() - entry).total_seconds() < maxWait:
805                # We still have nothing and we haven't hit the timeout.
806
807                # Poll
808                result = self._getUpdatedBatchJobImmediately()
809
810                if result is None:
811                    # Still nothing. Wait a second, or some fraction of our max wait time.
812                    time.sleep(min(maxWait/2, 1.0))
813
814            # When we get here, either we found something or we ran out of time
815            return result
816
817
818    def _getUpdatedBatchJobImmediately(self):
819        """
820        Return None if no updated (completed or failed) batch job is currently
821        available, and jobID, exitCode, runtime if such a job can be found.
822        """
823
824        # See if a local batch job has updated and is available immediately
825        local_tuple = self.getUpdatedLocalJob(0)
826        if local_tuple:
827            # If so, use it
828            return local_tuple
829
830        # Otherwise we didn't get a local job.
831
832        # Go looking for other jobs
833
834        # Everybody else does this with a queue and some other thread that
835        # is responsible for populating it.
836        # But we can just ask kubernetes now.
837
838        # Find a job that is done, failed, or stuck
839        jobObject = None
840        # Put 'done', 'failed', or 'stuck' here
841        chosenFor = ''
842
843        for j in self._ourJobObject(onlySucceeded=True):
844            # Look for succeeded jobs because that's the only filter Kubernetes has
845            jobObject = j
846            chosenFor = 'done'
847
848        if jobObject is None:
849            for j in self._ourJobObject():
850                # If there aren't any succeeded jobs, scan all jobs
851                # See how many times each failed
852                failCount = getattr(j.status, 'failed', 0)
853                if failCount is None:
854                    # Make sure it is an int
855                    failCount = 0
856                if failCount > 0:
857                    # Take the first failed one you find
858                    jobObject = j
859                    chosenFor = 'failed'
860                    break
861
862        if jobObject is None:
863            # If no jobs are failed, look for jobs with pods that are stuck for various reasons.
864            for j in self._ourJobObject():
865                pod = self._getPodForJob(j)
866
867                if pod is None:
868                    # Skip jobs with no pod
869                    continue
870
871                # Containers can get stuck in Waiting with reason ImagePullBackOff
872
873                # Get the statuses of the pod's containers
874                containerStatuses = pod.status.container_statuses
875                if containerStatuses is None or len(containerStatuses) == 0:
876                    # Pod exists but has no container statuses
877                    # This happens when the pod is just "Scheduled"
878                    # ("PodScheduled" status event) and isn't actually starting
879                    # to run yet.
880                    # Can't be stuck in ImagePullBackOff
881                    continue
882
883                waitingInfo = getattr(getattr(pod.status.container_statuses[0], 'state', None), 'waiting', None)
884                if waitingInfo is not None and waitingInfo.reason == 'ImagePullBackOff':
885                    # Assume it will never finish, even if the registry comes back or whatever.
886                    # We can get into this state when we send in a non-existent image.
887                    # See https://github.com/kubernetes/kubernetes/issues/58384
888                    jobObject = j
889                    chosenFor = 'stuck'
890                    logger.warning('Failing stuck job; did you try to run a non-existent Docker image?'
891                                   ' Check TOIL_APPLIANCE_SELF.')
892                    break
893
894                # Pods can also get stuck nearly but not quite out of memory,
895                # if their memory limits are high and they try to exhaust them.
896
897                if self._isPodStuckOOM(pod):
898                    # We found a job that probably should be OOM! Report it as stuck.
899                    # Polling function takes care of the logging.
900                    jobObject = j
901                    chosenFor = 'stuck'
902                    break
903
904        if jobObject is None:
905            # Say we couldn't find anything
906            return None
907
908
909        # Otherwise we got something.
910
911        # Work out what the job's ID was (whatever came after our name prefix)
912        jobID = int(jobObject.metadata.name[len(self.jobPrefix):])
913
914        # Work out when the job was submitted. If the pod fails before actually
915        # running, this is the basis for our runtime.
916        jobSubmitTime = getattr(jobObject.status, 'start_time', None)
917        if jobSubmitTime is None:
918            # If somehow this is unset, say it was just now.
919            jobSubmitTime = utc_now()
920
921        # Grab the pod
922        pod = self._getPodForJob(jobObject)
923
924        if pod is not None:
925            if chosenFor == 'done' or chosenFor == 'failed':
926                # The job actually finished or failed
927
928                # Get the statuses of the pod's containers
929                containerStatuses = pod.status.container_statuses
930
931                # Get when the pod started (reached the Kubelet) as a datetime
932                startTime = getattr(pod.status, 'start_time', None)
933                if startTime is None:
934                    # If the pod never made it to the kubelet to get a
935                    # start_time, say it was when the job was submitted.
936                    startTime = jobSubmitTime
937
938                if containerStatuses is None or len(containerStatuses) == 0:
939                    # No statuses available.
940                    # This happens when a pod is "Scheduled". But how could a
941                    # 'done' or 'failed' pod be merely "Scheduled"?
942                    # Complain so we can find out.
943                    logger.warning('Exit code and runtime unavailable; pod has no container statuses')
944                    logger.warning('Pod: %s', str(pod))
945                    exitCode = EXIT_STATUS_UNAVAILABLE_VALUE
946                    # Say it stopped now and started when it was scheduled/submitted.
947                    # We still need a strictly positive runtime.
948                    runtime = slow_down((utc_now() - startTime).total_seconds())
949                else:
950                    # Get the termination info from the pod's main (only) container
951                    terminatedInfo = getattr(getattr(containerStatuses[0], 'state', None), 'terminated', None)
952                    if terminatedInfo is None:
953                        logger.warning('Exit code and runtime unavailable; pod stopped without container terminating')
954                        logger.warning('Pod: %s', str(pod))
955                        exitCode = EXIT_STATUS_UNAVAILABLE_VALUE
956                        # Say it stopped now and started when it was scheduled/submitted.
957                        # We still need a strictly positive runtime.
958                        runtime = slow_down((utc_now() - startTime).total_seconds())
959                    else:
960                        # Extract the exit code
961                        exitCode = terminatedInfo.exit_code
962
963                        # Compute how long the job actually ran for (subtract
964                        # datetimes). We need to look at the pod's start time
965                        # because the job's start time is just when the job is
966                        # created. And we need to look at the pod's end time
967                        # because the job only gets a completion time if
968                        # successful.
969                        runtime = slow_down((terminatedInfo.finished_at -
970                                             pod.status.start_time).total_seconds())
971
972                        if chosenFor == 'failed':
973                            # Warn the user with the failed pod's log
974                            # TODO: cut this down somehow?
975                            logger.warning('Log from failed pod: %s', self._getLogForPod(pod))
976
977            else:
978                # The job has gotten stuck
979
980                assert chosenFor == 'stuck'
981
982                # Synthesize an exit code
983                exitCode = EXIT_STATUS_UNAVAILABLE_VALUE
984                # Say it ran from when the job was submitted to when the pod got stuck
985                runtime = slow_down((utc_now() - jobSubmitTime).total_seconds())
986        else:
987            # The pod went away from under the job.
988            logging.warning('Exit code and runtime unavailable; pod vanished')
989            exitCode = EXIT_STATUS_UNAVAILABLE_VALUE
990            # Say it ran from when the job was submitted to when the pod vanished
991            runtime = slow_down((utc_now() - jobSubmitTime).total_seconds())
992
993
994        try:
995            # Delete the job and all dependents (pods), hoping to get a 404 if it's magically gone
996            self._try_kubernetes_expecting_gone(self._api('batch').delete_namespaced_job, jobObject.metadata.name,
997                                                self.namespace,
998                                                propagation_policy='Foreground')
999
1000            # That just kicks off the deletion process. Foreground doesn't
1001            # actually block. See
1002            # https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/#foreground-cascading-deletion
1003            # We have to either wait until the deletion is done and we can't
1004            # see the job anymore, or ban the job from being "updated" again if
1005            # we see it. If we don't block on deletion, we can't use limit=1
1006            # on our query for succeeded jobs. So we poll for the job's
1007            # non-existence.
1008            self._waitForJobDeath(jobObject.metadata.name)
1009
1010        except ApiException as e:
1011            if e.status != 404:
1012                # Something is wrong, other than the job already being deleted.
1013                raise
1014            # Otherwise everything is fine and the job is gone.
1015
1016        # Return the one finished job we found
1017        return UpdatedBatchJobInfo(jobID=jobID, exitStatus=exitCode, wallTime=runtime, exitReason=None)
1018
1019    def _waitForJobDeath(self, jobName):
1020        """
1021        Block until the job with the given name no longer exists.
1022        """
1023
1024        # We do some exponential backoff on the polling
1025        # TODO: use a wait instead of polling?
1026        backoffTime = 0.1
1027        maxBackoffTime = 6.4
1028        while True:
1029            try:
1030                # Look for the job
1031                self._try_kubernetes_expecting_gone(self._api('batch').read_namespaced_job, jobName, self.namespace)
1032                # If we didn't 404, wait a bit with exponential backoff
1033                time.sleep(backoffTime)
1034                if backoffTime < maxBackoffTime:
1035                    backoffTime *= 2
1036            except ApiException as e:
1037                # We finally got a failure!
1038                if e.status != 404:
1039                    # But it wasn't due to the job being gone; something is wrong.
1040                    raise
1041                # It was a 404; the job is gone. Stop polling it.
1042                break
1043
1044    def shutdown(self):
1045
1046        # Shutdown local processes first
1047        self.shutdownLocal()
1048
1049
1050        # Kill all of our jobs and clean up pods that are associated with those jobs
1051        try:
1052            self._try_kubernetes_expecting_gone(self._api('batch').delete_collection_namespaced_job,
1053                                                            self.namespace,
1054                                                            label_selector="toil_run={}".format(self.runID),
1055                                                            propagation_policy='Background')
1056            logger.debug('Killed jobs with delete_collection_namespaced_job; cleaned up')
1057        except ApiException as e:
1058            if e.status != 404:
1059                # Anything other than a 404 is weird here.
1060                logger.error("Exception when calling BatchV1Api->delete_collection_namespaced_job: %s" % e)
1061
1062            # aggregate all pods and check if any pod has failed to cleanup or is orphaned.
1063            ourPods = self._ourPodObject()
1064
1065            for pod in ourPods:
1066                try:
1067                    if pod.status.phase == 'Failed':
1068                            logger.debug('Failed pod encountered at shutdown: %s', str(pod))
1069                    if pod.status.phase == 'Orphaned':
1070                            logger.debug('Orphaned pod encountered at shutdown: %s', str(pod))
1071                except:
1072                    # Don't get mad if that doesn't work.
1073                    pass
1074                try:
1075                    logger.debug('Cleaning up pod at shutdown: %s', str(pod))
1076                    respone = self._try_kubernetes_expecting_gone(self._api('core').delete_namespaced_pod,  pod.metadata.name,
1077                                        self.namespace,
1078                                        propagation_policy='Background')
1079                except ApiException as e:
1080                    if e.status != 404:
1081                        # Anything other than a 404 is weird here.
1082                        logger.error("Exception when calling CoreV1Api->delete_namespaced_pod: %s" % e)
1083
1084
1085    def _getIssuedNonLocalBatchJobIDs(self):
1086        """
1087        Get the issued batch job IDs that are not for local jobs.
1088        """
1089        jobIDs = []
1090        got_list = self._ourJobObject()
1091        for job in got_list:
1092            # Get the ID for each job
1093            jobIDs.append(self._getIDForOurJob(job))
1094        return jobIDs
1095
1096    def getIssuedBatchJobIDs(self):
1097        # Make sure to send the local jobs also
1098        return self._getIssuedNonLocalBatchJobIDs() + list(self.getIssuedLocalJobIDs())
1099
1100    def getRunningBatchJobIDs(self):
1101        # We need a dict from jobID (integer) to seconds it has been running
1102        secondsPerJob = dict()
1103        for job in self._ourJobObject():
1104            # Grab the pod for each job
1105            pod = self._getPodForJob(job)
1106
1107            if pod is None:
1108                # Jobs whose pods are gone are not running
1109                continue
1110
1111            if pod.status.phase == 'Running':
1112                # The job's pod is running
1113
1114                # The only time we have handy is when the pod got assigned to a
1115                # kubelet, which is technically before it started running.
1116                runtime = (utc_now() - pod.status.start_time).total_seconds()
1117
1118                # Save it under the stringified job ID
1119                secondsPerJob[self._getIDForOurJob(job)] = runtime
1120        # Mix in the local jobs
1121        secondsPerJob.update(self.getRunningLocalJobIDs())
1122        return secondsPerJob
1123
1124    def killBatchJobs(self, jobIDs):
1125
1126        # Kill all the ones that are local
1127        self.killLocalJobs(jobIDs)
1128
1129        # Clears workflow's jobs listed in jobIDs.
1130
1131        # First get the jobs we even issued non-locally
1132        issuedOnKubernetes = set(self._getIssuedNonLocalBatchJobIDs())
1133
1134        for jobID in jobIDs:
1135            # For each job we are supposed to kill
1136            if jobID not in issuedOnKubernetes:
1137                # It never went to Kubernetes (or wasn't there when we just
1138                # looked), so we can't kill it on Kubernetes.
1139                continue
1140            # Work out what the job would be named
1141            jobName = self.jobPrefix + str(jobID)
1142
1143            # Delete the requested job in the foreground.
1144            # This doesn't block, but it does delete expeditiously.
1145            response = self._try_kubernetes(self._api('batch').delete_namespaced_job, jobName,
1146                                                                self.namespace,
1147                                                                propagation_policy='Foreground')
1148            logger.debug('Killed job by request: %s', jobName)
1149
1150        for jobID in jobIDs:
1151            # Now we need to wait for all the jobs we killed to be gone.
1152
1153            # Work out what the job would be named
1154            jobName = self.jobPrefix + str(jobID)
1155
1156            # Block until it doesn't exist
1157            self._waitForJobDeath(jobName)
1158
1159def executor():
1160    """
1161    Main function of the _toil_kubernetes_executor entrypoint.
1162
1163    Runs inside the Toil container.
1164
1165    Responsible for setting up the user script and running the command for the
1166    job (which may in turn invoke the Toil worker entrypoint).
1167
1168    """
1169
1170    configure_root_logger()
1171    set_log_level("DEBUG")
1172    logger.debug("Starting executor")
1173
1174    # If we don't manage to run the child, what should our exit code be?
1175    exit_code = EXIT_STATUS_UNAVAILABLE_VALUE
1176
1177    if len(sys.argv) != 2:
1178        logger.error('Executor requires exactly one base64-encoded argument')
1179        sys.exit(exit_code)
1180
1181    # Take in a base64-encoded pickled dict as our first argument and decode it
1182    try:
1183        # Make sure to encode the text arguments to bytes before base 64 decoding
1184        job = pickle.loads(base64.b64decode(sys.argv[1].encode('utf-8')))
1185    except:
1186        exc_info = sys.exc_info()
1187        logger.error('Exception while unpickling task: ', exc_info=exc_info)
1188        sys.exit(exit_code)
1189
1190    if 'environment' in job:
1191        # Adopt the job environment into the executor.
1192        # This lets us use things like TOIL_WORKDIR when figuring out how to talk to other executors.
1193        logger.debug('Adopting environment: %s', str(job['environment'].keys()))
1194        for var, value in job['environment'].items():
1195            os.environ[var] = value
1196
1197    # Set JTRES_ROOT and other global state needed for resource
1198    # downloading/deployment to work.
1199    # TODO: Every worker downloads resources independently.
1200    # We should have a way to share a resource directory.
1201    logger.debug('Preparing system for resource download')
1202    Resource.prepareSystem()
1203    try:
1204        if 'userScript' in job:
1205            job['userScript'].register()
1206
1207        # Start the child process
1208        logger.debug("Invoking command: '%s'", job['command'])
1209        child = subprocess.Popen(job['command'],
1210                                 preexec_fn=lambda: os.setpgrp(),
1211                                 shell=True)
1212
1213        # Reproduce child's exit code
1214        exit_code = child.wait()
1215
1216    finally:
1217        logger.debug('Cleaning up resources')
1218        # TODO: Change resource system to use a shared resource directory for everyone.
1219        # Then move this into worker cleanup somehow
1220        Resource.cleanSystem()
1221        logger.debug('Shutting down')
1222        sys.exit(exit_code)
1223