1# Copyright (C) 2015-2021 Regents of the University of California 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14""" 15Batch system for running Toil workflows on Kubernetes. 16 17Ony useful with network-based job stores, like AWSJobStore. 18 19Within non-priveleged Kubernetes containers, additional Docker containers 20cannot yet be launched. That functionality will need to wait for user-mode 21Docker 22""" 23import base64 24import datetime 25import getpass 26import logging 27import os 28import pickle 29import string 30import subprocess 31import sys 32import tempfile 33import time 34import uuid 35 36import kubernetes 37import pytz 38import urllib3 39from kubernetes.client.rest import ApiException 40 41from toil import applianceSelf 42from toil.batchSystems.abstractBatchSystem import (EXIT_STATUS_UNAVAILABLE_VALUE, 43 BatchJobExitReason, 44 BatchSystemCleanupSupport, 45 UpdatedBatchJobInfo) 46from toil.common import Toil 47from toil.job import JobDescription 48from toil.lib.conversions import human2bytes 49from toil.lib.retry import ErrorCondition, retry 50from toil.resource import Resource 51from toil.statsAndLogging import configure_root_logger, set_log_level 52 53logger = logging.getLogger(__name__) 54retryable_kubernetes_errors = [urllib3.exceptions.MaxRetryError, 55 urllib3.exceptions.ProtocolError, 56 ApiException] 57 58 59def is_retryable_kubernetes_error(e): 60 """ 61 A function that determines whether or not Toil should retry or stop given 62 exceptions thrown by Kubernetes. 63 """ 64 for error in retryable_kubernetes_errors: 65 if isinstance(e, error): 66 return True 67 return False 68 69 70def slow_down(seconds): 71 """ 72 Toil jobs that have completed are not allowed to have taken 0 seconds, but 73 Kubernetes timestamps round things to the nearest second. It is possible in Kubernetes for 74 a pod to have identical start and end timestamps. 75 76 This function takes a possibly 0 job length in seconds and enforces a minimum length to satisfy Toil. 77 78 :param float seconds: Kubernetes timestamp difference 79 80 :return: seconds, or a small positive number if seconds is 0 81 :rtype: float 82 """ 83 84 return max(seconds, sys.float_info.epsilon) 85 86 87def utc_now(): 88 """Return a datetime in the UTC timezone corresponding to right now.""" 89 return datetime.datetime.utcnow().replace(tzinfo=pytz.UTC) 90 91 92class KubernetesBatchSystem(BatchSystemCleanupSupport): 93 @classmethod 94 def supportsAutoDeployment(cls): 95 return True 96 97 def __init__(self, config, maxCores, maxMemory, maxDisk): 98 super(KubernetesBatchSystem, self).__init__(config, maxCores, maxMemory, maxDisk) 99 100 # Turn down log level for Kubernetes modules and dependencies. 101 # Otherwise if we are at debug log level, we dump every 102 # request/response to Kubernetes, including tokens which we shouldn't 103 # reveal on CI. 104 logging.getLogger('kubernetes').setLevel(logging.ERROR) 105 logging.getLogger('requests_oauthlib').setLevel(logging.ERROR) 106 107 # This will hold the last time our Kubernetes credentials were refreshed 108 self.credential_time = None 109 # And this will hold our cache of API objects 110 self._apis = {} 111 112 # Get our namespace (and our Kubernetes credentials to make sure they exist) 113 self.namespace = self._api('namespace') 114 115 # Decide if we are going to mount a Kubernetes host path as /tmp in the workers. 116 # If we do this and the work dir is the default of the temp dir, caches will be shared. 117 self.host_path = config.kubernetesHostPath 118 if self.host_path is None and os.environ.get("TOIL_KUBERNETES_HOST_PATH", None) is not None: 119 # We can also take it from an environment variable 120 self.host_path = os.environ.get("TOIL_KUBERNETES_HOST_PATH") 121 122 # Make a Kubernetes-acceptable version of our username: not too long, 123 # and all lowercase letters, numbers, or - or . 124 acceptableChars = set(string.ascii_lowercase + string.digits + '-.') 125 126 # Use TOIL_KUBERNETES_OWNER if present in env var 127 if os.environ.get("TOIL_KUBERNETES_OWNER", None) is not None: 128 username = os.environ.get("TOIL_KUBERNETES_OWNER") 129 else: 130 username = ''.join([c for c in getpass.getuser().lower() if c in acceptableChars])[:100] 131 132 self.uniqueID = uuid.uuid4() 133 134 # Create a prefix for jobs, starting with our username 135 self.jobPrefix = '{}-toil-{}-'.format(username, self.uniqueID) 136 137 # Instead of letting Kubernetes assign unique job names, we assign our 138 # own based on a numerical job ID. This functionality is managed by the 139 # BatchSystemLocalSupport. 140 141 # Here is where we will store the user script resource object if we get one. 142 self.userScript = None 143 144 # Ge the image to deploy from Toil's configuration 145 self.dockerImage = applianceSelf() 146 147 # Try and guess what Toil work dir the workers will use. 148 # We need to be able to provision (possibly shared) space there. 149 self.workerWorkDir = Toil.getToilWorkDir(config.workDir) 150 if (config.workDir is None and 151 os.getenv('TOIL_WORKDIR') is None and 152 self.workerWorkDir == tempfile.gettempdir()): 153 154 # We defaulted to the system temp directory. But we think the 155 # worker Dockerfiles will make them use /var/lib/toil instead. 156 # TODO: Keep this in sync with the Dockerfile. 157 self.workerWorkDir = '/var/lib/toil' 158 159 # Get the name of the AWS secret, if any, to mount in containers. 160 # TODO: have some way to specify this (env var?)! 161 self.awsSecretName = os.environ.get("TOIL_AWS_SECRET_NAME", None) 162 163 # Set this to True to enable the experimental wait-for-job-update code 164 # TODO: Make this an environment variable? 165 self.enableWatching = os.environ.get("KUBE_WATCH_ENABLED", False) 166 167 self.runID = 'toil-{}'.format(self.uniqueID) 168 169 self.jobIds = set() 170 171 172 def _api(self, kind, max_age_seconds = 5 * 60): 173 """ 174 The Kubernetes module isn't clever enough to renew its credentials when 175 they are about to expire. See 176 https://github.com/kubernetes-client/python/issues/741. 177 178 We work around this by making sure that every time we are about to talk 179 to Kubernetes, we have fresh credentials. And we do that by reloading 180 the config and replacing our Kubernetes API objects before we do any 181 Kubernetes things. 182 183 TODO: We can still get in trouble if a single watch or listing loop 184 goes on longer than our credentials last, though. 185 186 This method is the Right Way to get any Kubernetes API. You call it 187 with the API you want ('batch', 'core', or 'customObjects') and it 188 returns an API object with guaranteed fresh credentials. 189 190 It also recognizes 'namespace' and returns our namespace as a string. 191 192 max_age_seconds needs to be << your cluster's credential expiry time. 193 """ 194 195 now = utc_now() 196 197 if self.credential_time is None or (now - self.credential_time).total_seconds() > max_age_seconds: 198 # Credentials need a refresh 199 try: 200 # Load ~/.kube/config or KUBECONFIG 201 kubernetes.config.load_kube_config() 202 # Worked. We're using kube config 203 config_source = 'kube' 204 except kubernetes.config.ConfigException: 205 # Didn't work. Try pod-based credentials in case we are in a pod. 206 try: 207 kubernetes.config.load_incluster_config() 208 # Worked. We're using in_cluster config 209 config_source = 'in_cluster' 210 except kubernetes.config.ConfigException: 211 raise RuntimeError('Could not load Kubernetes configuration from ~/.kube/config, $KUBECONFIG, or current pod.') 212 213 # Now fill in the API objects with these credentials 214 self._apis['batch'] = kubernetes.client.BatchV1Api() 215 self._apis['core'] = kubernetes.client.CoreV1Api() 216 self._apis['customObjects'] = kubernetes.client.CustomObjectsApi() 217 218 # And save the time 219 self.credential_time = now 220 if kind == 'namespace': 221 # We just need the namespace string 222 if config_source == 'in_cluster': 223 # Our namespace comes from a particular file. 224 with open("/var/run/secrets/kubernetes.io/serviceaccount/namespace", 'r') as fh: 225 return fh.read().strip() 226 else: 227 # Find all contexts and the active context. 228 # The active context gets us our namespace. 229 contexts, activeContext = kubernetes.config.list_kube_config_contexts() 230 if not contexts: 231 raise RuntimeError("No Kubernetes contexts available in ~/.kube/config or $KUBECONFIG") 232 233 # Identify the namespace to work in 234 return activeContext.get('context', {}).get('namespace', 'default') 235 236 else: 237 # We need an API object 238 try: 239 return self._apis[kind] 240 except KeyError: 241 raise RuntimeError("Unknown Kubernetes API type: {}".format(kind)) 242 243 @retry(errors=retryable_kubernetes_errors) 244 def _try_kubernetes(self, method, *args, **kwargs): 245 """ 246 Kubernetes API can end abruptly and fail when it could dynamically backoff and retry. 247 248 For example, calling self._api('batch').create_namespaced_job(self.namespace, job), 249 Kubernetes can behave inconsistently and fail given a large job. See 250 https://github.com/DataBiosphere/toil/issues/2884. 251 252 This function gives Kubernetes more time to try an executable api. 253 """ 254 return method(*args, **kwargs) 255 256 @retry(errors=retryable_kubernetes_errors + [ 257 ErrorCondition( 258 error=ApiException, 259 error_codes=[404], 260 retry_on_this_condition=False 261 )]) 262 def _try_kubernetes_expecting_gone(self, method, *args, **kwargs): 263 """ 264 Same as _try_kubernetes, but raises 404 errors as soon as they are 265 encountered (because we are waiting for them) instead of retrying on 266 them. 267 """ 268 return method(*args, **kwargs) 269 270 def _try_kubernetes_stream(self, method, *args, **kwargs): 271 """ 272 Kubernetes kubernetes.watch.Watch().stream() streams can fail and raise 273 errors. We don't want to have those errors fail the entire workflow, so 274 we handle them here. 275 276 When you want to stream the results of a Kubernetes API method, call 277 this instead of stream(). 278 279 To avoid having to do our own timeout logic, we finish the watch early 280 if it produces an error. 281 """ 282 283 w = kubernetes.watch.Watch() 284 285 # We will set this to bypass our second catch in the case of user errors. 286 userError = False 287 288 try: 289 for item in w.stream(method, *args, **kwargs): 290 # For everything the watch stream gives us 291 try: 292 # Show the item to user code 293 yield item 294 except Exception as e: 295 # If we get an error from user code, skip our catch around 296 # the Kubernetes generator. 297 userError = True 298 raise 299 except Exception as e: 300 # If we get an error 301 if userError: 302 # It wasn't from the Kubernetes watch generator. Pass it along. 303 raise 304 else: 305 # It was from the Kubernetes watch generator we manage. 306 if is_retryable_kubernetes_error(e): 307 # This is just cloud weather. 308 # TODO: We will also get an APIError if we just can't code good against Kubernetes. So make sure to warn. 309 logger.warning("Received error from Kubernetes watch stream: %s", e) 310 # Just end the watch. 311 return 312 else: 313 # Something actually weird is happening. 314 raise 315 316 317 def setUserScript(self, userScript): 318 logger.info('Setting user script for deployment: {}'.format(userScript)) 319 self.userScript = userScript 320 321 # setEnv is provided by BatchSystemSupport, updates self.environment 322 323 def _create_affinity(self, preemptable: bool) -> kubernetes.client.V1Affinity: 324 """ 325 Make a V1Affinity that places pods appropriately depending on if they 326 tolerate preemptable nodes or not. 327 """ 328 329 # Describe preemptable nodes 330 331 # There's no labeling standard for knowing which nodes are 332 # preemptable across different cloud providers/Kubernetes clusters, 333 # so we use the labels that EKS uses. Toil-managed Kubernetes 334 # clusters also use this label. If we come to support more kinds of 335 # preemptable nodes, we will need to add more labels to avoid here. 336 preemptable_label = "eks.amazonaws.com/capacityType" 337 preemptable_value = "SPOT" 338 339 non_spot = [kubernetes.client.V1NodeSelectorRequirement(key=preemptable_label, 340 operator='NotIn', 341 values=[preemptable_value])] 342 unspecified = [kubernetes.client.V1NodeSelectorRequirement(key=preemptable_label, 343 operator='DoesNotExist')] 344 # These are OR'd 345 node_selector_terms = [kubernetes.client.V1NodeSelectorTerm(match_expressions=non_spot), 346 kubernetes.client.V1NodeSelectorTerm(match_expressions=unspecified)] 347 node_selector = kubernetes.client.V1NodeSelector(node_selector_terms=node_selector_terms) 348 349 350 if preemptable: 351 # We can put this job anywhere. But we would be smart to prefer 352 # preemptable nodes first, if available, so we don't block any 353 # non-preemptable jobs. 354 node_preference = kubernetes.client.V1PreferredSchedulingTerm(weight=1, preference=node_selector) 355 356 node_affinity = kubernetes.client.V1NodeAffinity(preferred_during_scheduling_ignored_during_execution=[node_preference]) 357 else: 358 # We need to add some selector stuff to keep the job off of 359 # nodes that might be preempted. 360 node_affinity = kubernetes.client.V1NodeAffinity(required_during_scheduling_ignored_during_execution=node_selector) 361 362 # Make the node affinity into an overall affinity 363 return kubernetes.client.V1Affinity(node_affinity=node_affinity) 364 365 def _create_pod_spec(self, jobDesc: JobDescription) -> kubernetes.client.V1PodSpec: 366 """ 367 Make the specification for a pod that can execute the given job. 368 """ 369 370 # Make a job dict to send to the executor. 371 # First just wrap the command and the environment to run it in 372 job = {'command': jobDesc.command, 373 'environment': self.environment.copy()} 374 # TODO: query customDockerInitCmd to respect TOIL_CUSTOM_DOCKER_INIT_COMMAND 375 376 if self.userScript is not None: 377 # If there's a user script resource be sure to send it along 378 job['userScript'] = self.userScript 379 380 # Encode it in a form we can send in a command-line argument. Pickle in 381 # the highest protocol to prevent mixed-Python-version workflows from 382 # trying to work. Make sure it is text so we can ship it to Kubernetes 383 # via JSON. 384 encodedJob = base64.b64encode(pickle.dumps(job, pickle.HIGHEST_PROTOCOL)).decode('utf-8') 385 386 # The Kubernetes API makes sense only in terms of the YAML format. Objects 387 # represent sections of the YAML files. Except from our point of view, all 388 # the internal nodes in the YAML structure are named and typed. 389 390 # For docs, start at the root of the job hierarchy: 391 # https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1Job.md 392 393 # Make a definition for the container's resource requirements. 394 # Add on a bit for Kubernetes overhead (Toil worker's memory, hot deployed 395 # user scripts). 396 # Kubernetes needs some lower limit of memory to run the pod at all without 397 # OOMing. We also want to provision some extra space so that when 398 # we test _isPodStuckOOM we never get True unless the job has 399 # exceeded jobDesc.memory. 400 requirements_dict = {'cpu': jobDesc.cores, 401 'memory': jobDesc.memory + 1024 * 1024 * 512, 402 'ephemeral-storage': jobDesc.disk + 1024 * 1024 * 512} 403 # Use the requirements as the limits, for predictable behavior, and because 404 # the UCSC Kubernetes admins want it that way. 405 limits_dict = requirements_dict 406 resources = kubernetes.client.V1ResourceRequirements(limits=limits_dict, 407 requests=requirements_dict) 408 409 # Collect volumes and mounts 410 volumes = [] 411 mounts = [] 412 413 if self.host_path is not None: 414 # Provision Toil WorkDir from a HostPath volume, to share with other pods 415 host_path_volume_name = 'workdir' 416 # Use type='Directory' to fail if the host directory doesn't exist already. 417 host_path_volume_source = kubernetes.client.V1HostPathVolumeSource(path=self.host_path, type='Directory') 418 host_path_volume = kubernetes.client.V1Volume(name=host_path_volume_name, 419 host_path=host_path_volume_source) 420 volumes.append(host_path_volume) 421 host_path_volume_mount = kubernetes.client.V1VolumeMount(mount_path=self.workerWorkDir, name=host_path_volume_name) 422 mounts.append(host_path_volume_mount) 423 else: 424 # Provision Toil WorkDir as an ephemeral volume 425 ephemeral_volume_name = 'workdir' 426 ephemeral_volume_source = kubernetes.client.V1EmptyDirVolumeSource() 427 ephemeral_volume = kubernetes.client.V1Volume(name=ephemeral_volume_name, 428 empty_dir=ephemeral_volume_source) 429 volumes.append(ephemeral_volume) 430 ephemeral_volume_mount = kubernetes.client.V1VolumeMount(mount_path=self.workerWorkDir, name=ephemeral_volume_name) 431 mounts.append(ephemeral_volume_mount) 432 433 if self.awsSecretName is not None: 434 # Also mount an AWS secret, if provided. 435 # TODO: make this generic somehow 436 secret_volume_name = 's3-credentials' 437 secret_volume_source = kubernetes.client.V1SecretVolumeSource(secret_name=self.awsSecretName) 438 secret_volume = kubernetes.client.V1Volume(name=secret_volume_name, 439 secret=secret_volume_source) 440 volumes.append(secret_volume) 441 secret_volume_mount = kubernetes.client.V1VolumeMount(mount_path='/root/.aws', name=secret_volume_name) 442 mounts.append(secret_volume_mount) 443 444 # Make a container definition 445 container = kubernetes.client.V1Container(command=['_toil_kubernetes_executor', encodedJob], 446 image=self.dockerImage, 447 name="runner-container", 448 resources=resources, 449 volume_mounts=mounts) 450 # Wrap the container in a spec 451 pod_spec = kubernetes.client.V1PodSpec(containers=[container], 452 volumes=volumes, 453 restart_policy="Never") 454 # Tell the spec where to land 455 pod_spec.affinity = self._create_affinity(jobDesc.preemptable) 456 457 return pod_spec 458 459 460 def issueBatchJob(self, jobDesc): 461 # TODO: get a sensible self.maxCores, etc. so we can checkResourceRequest. 462 # How do we know if the cluster will autoscale? 463 464 # Try the job as local 465 localID = self.handleLocalJob(jobDesc) 466 if localID: 467 # It is a local job 468 return localID 469 else: 470 # We actually want to send to the cluster 471 472 # Check resource requirements (managed by BatchSystemSupport) 473 self.checkResourceRequest(jobDesc.memory, jobDesc.cores, jobDesc.disk) 474 475 # Make a pod that describes running the job 476 pod_spec = self._create_pod_spec(jobDesc) 477 478 # Make a batch system scope job ID 479 jobID = self.getNextJobID() 480 # Make a unique name 481 jobName = self.jobPrefix + str(jobID) 482 483 # Make metadata to label the job/pod with info. 484 # Don't let the cluster autoscaler evict any Toil jobs. 485 metadata = kubernetes.client.V1ObjectMeta(name=jobName, 486 labels={"toil_run": self.runID}, 487 annotations={"cluster-autoscaler.kubernetes.io/safe-to-evict": "false"}) 488 489 # Wrap the spec in a template 490 template = kubernetes.client.V1PodTemplateSpec(spec=pod_spec, metadata=metadata) 491 492 # Make another spec for the job, asking to run the template with no backoff 493 job_spec = kubernetes.client.V1JobSpec(template=template, backoff_limit=0) 494 495 # And make the actual job 496 job = kubernetes.client.V1Job(spec=job_spec, 497 metadata=metadata, 498 api_version="batch/v1", 499 kind="Job") 500 501 # Make the job 502 launched = self._try_kubernetes(self._api('batch').create_namespaced_job, self.namespace, job) 503 504 logger.debug('Launched job: %s', jobName) 505 506 return jobID 507 508 def _ourJobObject(self, onlySucceeded=False): 509 """ 510 Yield Kubernetes V1Job objects that we are responsible for that the 511 cluster knows about. 512 513 Doesn't support a free-form selector, because there's only about 3 514 things jobs can be selected on: https://stackoverflow.com/a/55808444 515 516 :param bool onlySucceeded: restrict results to succeeded jobs. 517 :param int limit: max results to yield. 518 """ 519 520 # We need to page through the list from the cluster with a continuation 521 # token. These expire after about 5 minutes. If we use an expired one, 522 # we get a 410 error and a new token, and we can use the new token to 523 # get the rest of the list, but the list will be updated. 524 # 525 # TODO: How to get the new token isn't clear. See 526 # https://github.com/kubernetes-client/python/issues/953. For now we 527 # will just throw an error if we don't get to the end of the list in 528 # time. 529 530 token = None 531 532 while True: 533 # We can't just pass e.g. a None continue token when there isn't 534 # one, because the Kubernetes module reads its kwargs dict and 535 # cares about presence/absence. So we build a dict to send. 536 kwargs = {} 537 538 if token is not None: 539 kwargs['_continue'] = token 540 541 if onlySucceeded: 542 results = self._try_kubernetes(self._api('batch').list_namespaced_job, self.namespace, 543 label_selector="toil_run={}".format(self.runID), field_selector="status.successful==1", **kwargs) 544 else: 545 results = self._try_kubernetes(self._api('batch').list_namespaced_job, self.namespace, 546 label_selector="toil_run={}".format(self.runID), **kwargs) 547 for job in results.items: 548 # This job belongs to us 549 yield job 550 551 # Remember the continuation token, if any 552 token = getattr(results.metadata, 'continue', None) 553 554 if token is None: 555 # There isn't one. We got everything. 556 break 557 558 559 def _ourPodObject(self): 560 """ 561 Yield Kubernetes V1Pod objects that we are responsible for that the 562 cluster knows about. 563 """ 564 565 token = None 566 567 while True: 568 # We can't just pass e.g. a None continue token when there isn't 569 # one, because the Kubernetes module reads its kwargs dict and 570 # cares about presence/absence. So we build a dict to send. 571 kwargs = {} 572 573 if token is not None: 574 kwargs['_continue'] = token 575 576 results = self._try_kubernetes(self._api('core').list_namespaced_pod, self.namespace, label_selector="toil_run={}".format(self.runID), **kwargs) 577 578 for pod in results.items: 579 yield pod 580 # Remember the continuation token, if any 581 token = getattr(results.metadata, 'continue', None) 582 583 if token is None: 584 # There isn't one. We got everything. 585 break 586 587 588 def _getPodForJob(self, jobObject): 589 """ 590 Get the pod that belongs to the given job, or None if the job's pod is 591 missing. The pod knows about things like the job's exit code. 592 593 :param kubernetes.client.V1Job jobObject: a Kubernetes job to look up 594 pods for. 595 596 :return: The pod for the job, or None if no pod is found. 597 :rtype: kubernetes.client.V1Pod 598 """ 599 600 token = None 601 602 # Work out what the return code was (which we need to get from the 603 # pods) We get the associated pods by querying on the label selector 604 # `job-name=JOBNAME` 605 query = 'job-name={}'.format(jobObject.metadata.name) 606 607 while True: 608 # We can't just pass e.g. a None continue token when there isn't 609 # one, because the Kubernetes module reads its kwargs dict and 610 # cares about presence/absence. So we build a dict to send. 611 kwargs = {'label_selector': query} 612 if token is not None: 613 kwargs['_continue'] = token 614 results = self._try_kubernetes(self._api('core').list_namespaced_pod, self.namespace, **kwargs) 615 616 for pod in results.items: 617 # Return the first pod we find 618 return pod 619 620 # Remember the continuation token, if any 621 token = getattr(results.metadata, 'continue', None) 622 623 if token is None: 624 # There isn't one. We got everything. 625 break 626 627 # If we get here, no pages had any pods. 628 return None 629 630 def _getLogForPod(self, podObject): 631 """ 632 Get the log for a pod. 633 634 :param kubernetes.client.V1Pod podObject: a Kubernetes pod with one 635 container to get the log from. 636 637 :return: The log for the only container in the pod. 638 :rtype: str 639 640 """ 641 642 return self._try_kubernetes(self._api('core').read_namespaced_pod_log, podObject.metadata.name, 643 namespace=self.namespace) 644 645 def _isPodStuckOOM(self, podObject, minFreeBytes=1024 * 1024 * 2): 646 """ 647 Poll the current memory usage for the pod from the cluster. 648 649 Return True if the pod looks to be in a soft/stuck out of memory (OOM) 650 state, where it is using too much memory to actually make progress, but 651 not enough to actually trigger the OOM killer to kill it. For some 652 large memory limits, on some Kubernetes clusters, pods can get stuck in 653 this state when their memory limits are high (approx. 200 Gi). 654 655 We operationalize "OOM" as having fewer than minFreeBytes bytes free. 656 657 We assume the pod has only one container, as Toil's pods do. 658 659 If the metrics service is not working, we treat the pod as not being 660 stuck OOM. Otherwise, we would kill all functioning jobs on clusters 661 where the metrics service is down or isn't installed. 662 663 :param kubernetes.client.V1Pod podObject: a Kubernetes pod with one 664 container to check up on. 665 :param int minFreeBytes: Minimum free bytes to not be OOM. 666 667 :return: True if the pod is OOM, False otherwise. 668 :rtype: bool 669 """ 670 671 # Compose a query to get just the pod we care about 672 query = 'metadata.name=' + podObject.metadata.name 673 674 # Look for it, but manage our own exceptions 675 try: 676 # TODO: When the Kubernetes Python API actually wraps the metrics API, switch to that 677 response = self._api('customObjects').list_namespaced_custom_object('metrics.k8s.io', 'v1beta1', 678 self.namespace, 'pods', 679 field_selector=query) 680 except Exception as e: 681 # We couldn't talk to the metrics service on this attempt. We don't 682 # retry, but we also don't want to just ignore all errors. We only 683 # want to ignore errors we expect to see if the problem is that the 684 # metrics service is not working. 685 if type(e) in retryable_kubernetes_errors: 686 # This is the sort of error we would expect from an overloaded 687 # Kubernetes or a dead metrics service. 688 # We can't tell that the pod is stuck, so say that it isn't. 689 logger.warning("Could not query metrics service: %s", e) 690 return False 691 else: 692 raise 693 694 # Pull out the items 695 items = response.get('items', []) 696 697 if len(items) == 0: 698 # If there's no statistics we can't say we're stuck OOM 699 return False 700 701 # Assume the first result is the right one, because of the selector 702 # Assume it has exactly one pod, because we made it 703 containers = items[0].get('containers', [{}]) 704 705 if len(containers) == 0: 706 # If there are no containers (because none have started yet?), we can't say we're stuck OOM 707 return False 708 709 # Otherwise, assume it just has one container. 710 # Grab the memory usage string, like 123Ki, and convert to bytes. 711 # If anything is missing, assume 0 bytes used. 712 bytesUsed = human2bytes(containers[0].get('usage', {}).get('memory', '0')) 713 714 # Also get the limit out of the pod object's spec 715 bytesAllowed = human2bytes(podObject.spec.containers[0].resources.limits['memory']) 716 717 if bytesAllowed - bytesUsed < minFreeBytes: 718 # This is too much! 719 logger.warning('Pod %s has used %d of %d bytes of memory; reporting as stuck due to OOM.', 720 podObject.metadata.name, bytesUsed, bytesAllowed) 721 722 return True 723 724 725 726 727 def _getIDForOurJob(self, jobObject): 728 """ 729 Get the JobID number that belongs to the given job that we own. 730 731 :param kubernetes.client.V1Job jobObject: a Kubernetes job object that is a job we issued. 732 733 :return: The JobID for the job. 734 :rtype: int 735 """ 736 737 return int(jobObject.metadata.name[len(self.jobPrefix):]) 738 739 740 def getUpdatedBatchJob(self, maxWait): 741 742 entry = datetime.datetime.now() 743 744 result = self._getUpdatedBatchJobImmediately() 745 746 if result is not None or maxWait == 0: 747 # We got something on the first try, or we only get one try 748 return result 749 750 # Otherwise we need to maybe wait. 751 if self.enableWatching: 752 for event in self._try_kubernetes_stream(self._api('batch').list_namespaced_job, self.namespace, 753 label_selector="toil_run={}".format(self.runID), 754 timeout_seconds=maxWait): 755 # Grab the metadata data, ID, the list of conditions of the current job, and the total pods 756 jobObject = event['object'] 757 jobID = int(jobObject.metadata.name[len(self.jobPrefix):]) 758 jobObjectListConditions =jobObject.status.conditions 759 totalPods = jobObject.status.active + jobObject.status.finished + jobObject.status.failed 760 # Exit Reason defaults to 'Successfully Finsihed` unless said otherwise 761 exitReason = BatchJobExitReason.FINISHED 762 exitCode = 0 763 764 # Check if there are any active pods 765 if jobObject.status.acitve > 0: 766 logger.info("%s has %d pods running" % jobObject.metadata.name, jobObject.status.active) 767 continue 768 elif jobObject.status.failed > 0 or jobObject.status.finished > 0: 769 # No more active pods in the current job ; must be finished 770 logger.info("%s RESULTS -> Succeeded: %d Failed:%d Active:%d" % jobObject.metadata.name, 771 jobObject.status.succeeded, jobObject.status.failed, jobObject.status.active) 772 # Get termination information of job 773 termination = jobObjectListConditions[0] 774 # Log out succeess/failure given a reason 775 logger.info("%s REASON: %s", termination.type, termination.reason) 776 777 # Log out reason of failure and pod exit code 778 if jobObject.status.failed > 0: 779 exitReason = BatchJobExitReason.FAILED 780 pod = self._getPodForJob(jobObject) 781 logger.debug("Failed job %s", str(jobObject)) 782 logger.warning("Failed Job Message: %s", termination.message) 783 exitCode = pod.status.container_statuses[0].state.terminated.exit_code 784 785 runtime = slow_down((termination.completion_time - termination.start_time).total_seconds()) 786 result = UpdatedBatchJobInfo(jobID=jobID, exitStatus=exitCode, wallTime=runtime, exitReason=exitReason) 787 788 if (exitReason == BatchJobExitReason.FAILED) or (jobObject.status.finished == totalPods): 789 # Cleanup if job is all finished or there was a pod that failed 790 self._try_kubernetes(self._api('batch').delete_namespaced_job, 791 jobObject.metadata.name, 792 self.namespace, 793 propagation_policy='Foreground') 794 self._waitForJobDeath(jobObject.metadata.name) 795 return result 796 continue 797 else: 798 # Job is not running/updating ; no active, successful, or failed pods yet 799 logger.debug("Job %s -> %s" % (jobObject.metadata.name, jobObjectListConditions[0].reason)) 800 # Pod could be pending; don't say it's lost. 801 continue 802 else: 803 # Try polling instead 804 while result is None and (datetime.datetime.now() - entry).total_seconds() < maxWait: 805 # We still have nothing and we haven't hit the timeout. 806 807 # Poll 808 result = self._getUpdatedBatchJobImmediately() 809 810 if result is None: 811 # Still nothing. Wait a second, or some fraction of our max wait time. 812 time.sleep(min(maxWait/2, 1.0)) 813 814 # When we get here, either we found something or we ran out of time 815 return result 816 817 818 def _getUpdatedBatchJobImmediately(self): 819 """ 820 Return None if no updated (completed or failed) batch job is currently 821 available, and jobID, exitCode, runtime if such a job can be found. 822 """ 823 824 # See if a local batch job has updated and is available immediately 825 local_tuple = self.getUpdatedLocalJob(0) 826 if local_tuple: 827 # If so, use it 828 return local_tuple 829 830 # Otherwise we didn't get a local job. 831 832 # Go looking for other jobs 833 834 # Everybody else does this with a queue and some other thread that 835 # is responsible for populating it. 836 # But we can just ask kubernetes now. 837 838 # Find a job that is done, failed, or stuck 839 jobObject = None 840 # Put 'done', 'failed', or 'stuck' here 841 chosenFor = '' 842 843 for j in self._ourJobObject(onlySucceeded=True): 844 # Look for succeeded jobs because that's the only filter Kubernetes has 845 jobObject = j 846 chosenFor = 'done' 847 848 if jobObject is None: 849 for j in self._ourJobObject(): 850 # If there aren't any succeeded jobs, scan all jobs 851 # See how many times each failed 852 failCount = getattr(j.status, 'failed', 0) 853 if failCount is None: 854 # Make sure it is an int 855 failCount = 0 856 if failCount > 0: 857 # Take the first failed one you find 858 jobObject = j 859 chosenFor = 'failed' 860 break 861 862 if jobObject is None: 863 # If no jobs are failed, look for jobs with pods that are stuck for various reasons. 864 for j in self._ourJobObject(): 865 pod = self._getPodForJob(j) 866 867 if pod is None: 868 # Skip jobs with no pod 869 continue 870 871 # Containers can get stuck in Waiting with reason ImagePullBackOff 872 873 # Get the statuses of the pod's containers 874 containerStatuses = pod.status.container_statuses 875 if containerStatuses is None or len(containerStatuses) == 0: 876 # Pod exists but has no container statuses 877 # This happens when the pod is just "Scheduled" 878 # ("PodScheduled" status event) and isn't actually starting 879 # to run yet. 880 # Can't be stuck in ImagePullBackOff 881 continue 882 883 waitingInfo = getattr(getattr(pod.status.container_statuses[0], 'state', None), 'waiting', None) 884 if waitingInfo is not None and waitingInfo.reason == 'ImagePullBackOff': 885 # Assume it will never finish, even if the registry comes back or whatever. 886 # We can get into this state when we send in a non-existent image. 887 # See https://github.com/kubernetes/kubernetes/issues/58384 888 jobObject = j 889 chosenFor = 'stuck' 890 logger.warning('Failing stuck job; did you try to run a non-existent Docker image?' 891 ' Check TOIL_APPLIANCE_SELF.') 892 break 893 894 # Pods can also get stuck nearly but not quite out of memory, 895 # if their memory limits are high and they try to exhaust them. 896 897 if self._isPodStuckOOM(pod): 898 # We found a job that probably should be OOM! Report it as stuck. 899 # Polling function takes care of the logging. 900 jobObject = j 901 chosenFor = 'stuck' 902 break 903 904 if jobObject is None: 905 # Say we couldn't find anything 906 return None 907 908 909 # Otherwise we got something. 910 911 # Work out what the job's ID was (whatever came after our name prefix) 912 jobID = int(jobObject.metadata.name[len(self.jobPrefix):]) 913 914 # Work out when the job was submitted. If the pod fails before actually 915 # running, this is the basis for our runtime. 916 jobSubmitTime = getattr(jobObject.status, 'start_time', None) 917 if jobSubmitTime is None: 918 # If somehow this is unset, say it was just now. 919 jobSubmitTime = utc_now() 920 921 # Grab the pod 922 pod = self._getPodForJob(jobObject) 923 924 if pod is not None: 925 if chosenFor == 'done' or chosenFor == 'failed': 926 # The job actually finished or failed 927 928 # Get the statuses of the pod's containers 929 containerStatuses = pod.status.container_statuses 930 931 # Get when the pod started (reached the Kubelet) as a datetime 932 startTime = getattr(pod.status, 'start_time', None) 933 if startTime is None: 934 # If the pod never made it to the kubelet to get a 935 # start_time, say it was when the job was submitted. 936 startTime = jobSubmitTime 937 938 if containerStatuses is None or len(containerStatuses) == 0: 939 # No statuses available. 940 # This happens when a pod is "Scheduled". But how could a 941 # 'done' or 'failed' pod be merely "Scheduled"? 942 # Complain so we can find out. 943 logger.warning('Exit code and runtime unavailable; pod has no container statuses') 944 logger.warning('Pod: %s', str(pod)) 945 exitCode = EXIT_STATUS_UNAVAILABLE_VALUE 946 # Say it stopped now and started when it was scheduled/submitted. 947 # We still need a strictly positive runtime. 948 runtime = slow_down((utc_now() - startTime).total_seconds()) 949 else: 950 # Get the termination info from the pod's main (only) container 951 terminatedInfo = getattr(getattr(containerStatuses[0], 'state', None), 'terminated', None) 952 if terminatedInfo is None: 953 logger.warning('Exit code and runtime unavailable; pod stopped without container terminating') 954 logger.warning('Pod: %s', str(pod)) 955 exitCode = EXIT_STATUS_UNAVAILABLE_VALUE 956 # Say it stopped now and started when it was scheduled/submitted. 957 # We still need a strictly positive runtime. 958 runtime = slow_down((utc_now() - startTime).total_seconds()) 959 else: 960 # Extract the exit code 961 exitCode = terminatedInfo.exit_code 962 963 # Compute how long the job actually ran for (subtract 964 # datetimes). We need to look at the pod's start time 965 # because the job's start time is just when the job is 966 # created. And we need to look at the pod's end time 967 # because the job only gets a completion time if 968 # successful. 969 runtime = slow_down((terminatedInfo.finished_at - 970 pod.status.start_time).total_seconds()) 971 972 if chosenFor == 'failed': 973 # Warn the user with the failed pod's log 974 # TODO: cut this down somehow? 975 logger.warning('Log from failed pod: %s', self._getLogForPod(pod)) 976 977 else: 978 # The job has gotten stuck 979 980 assert chosenFor == 'stuck' 981 982 # Synthesize an exit code 983 exitCode = EXIT_STATUS_UNAVAILABLE_VALUE 984 # Say it ran from when the job was submitted to when the pod got stuck 985 runtime = slow_down((utc_now() - jobSubmitTime).total_seconds()) 986 else: 987 # The pod went away from under the job. 988 logging.warning('Exit code and runtime unavailable; pod vanished') 989 exitCode = EXIT_STATUS_UNAVAILABLE_VALUE 990 # Say it ran from when the job was submitted to when the pod vanished 991 runtime = slow_down((utc_now() - jobSubmitTime).total_seconds()) 992 993 994 try: 995 # Delete the job and all dependents (pods), hoping to get a 404 if it's magically gone 996 self._try_kubernetes_expecting_gone(self._api('batch').delete_namespaced_job, jobObject.metadata.name, 997 self.namespace, 998 propagation_policy='Foreground') 999 1000 # That just kicks off the deletion process. Foreground doesn't 1001 # actually block. See 1002 # https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/#foreground-cascading-deletion 1003 # We have to either wait until the deletion is done and we can't 1004 # see the job anymore, or ban the job from being "updated" again if 1005 # we see it. If we don't block on deletion, we can't use limit=1 1006 # on our query for succeeded jobs. So we poll for the job's 1007 # non-existence. 1008 self._waitForJobDeath(jobObject.metadata.name) 1009 1010 except ApiException as e: 1011 if e.status != 404: 1012 # Something is wrong, other than the job already being deleted. 1013 raise 1014 # Otherwise everything is fine and the job is gone. 1015 1016 # Return the one finished job we found 1017 return UpdatedBatchJobInfo(jobID=jobID, exitStatus=exitCode, wallTime=runtime, exitReason=None) 1018 1019 def _waitForJobDeath(self, jobName): 1020 """ 1021 Block until the job with the given name no longer exists. 1022 """ 1023 1024 # We do some exponential backoff on the polling 1025 # TODO: use a wait instead of polling? 1026 backoffTime = 0.1 1027 maxBackoffTime = 6.4 1028 while True: 1029 try: 1030 # Look for the job 1031 self._try_kubernetes_expecting_gone(self._api('batch').read_namespaced_job, jobName, self.namespace) 1032 # If we didn't 404, wait a bit with exponential backoff 1033 time.sleep(backoffTime) 1034 if backoffTime < maxBackoffTime: 1035 backoffTime *= 2 1036 except ApiException as e: 1037 # We finally got a failure! 1038 if e.status != 404: 1039 # But it wasn't due to the job being gone; something is wrong. 1040 raise 1041 # It was a 404; the job is gone. Stop polling it. 1042 break 1043 1044 def shutdown(self): 1045 1046 # Shutdown local processes first 1047 self.shutdownLocal() 1048 1049 1050 # Kill all of our jobs and clean up pods that are associated with those jobs 1051 try: 1052 self._try_kubernetes_expecting_gone(self._api('batch').delete_collection_namespaced_job, 1053 self.namespace, 1054 label_selector="toil_run={}".format(self.runID), 1055 propagation_policy='Background') 1056 logger.debug('Killed jobs with delete_collection_namespaced_job; cleaned up') 1057 except ApiException as e: 1058 if e.status != 404: 1059 # Anything other than a 404 is weird here. 1060 logger.error("Exception when calling BatchV1Api->delete_collection_namespaced_job: %s" % e) 1061 1062 # aggregate all pods and check if any pod has failed to cleanup or is orphaned. 1063 ourPods = self._ourPodObject() 1064 1065 for pod in ourPods: 1066 try: 1067 if pod.status.phase == 'Failed': 1068 logger.debug('Failed pod encountered at shutdown: %s', str(pod)) 1069 if pod.status.phase == 'Orphaned': 1070 logger.debug('Orphaned pod encountered at shutdown: %s', str(pod)) 1071 except: 1072 # Don't get mad if that doesn't work. 1073 pass 1074 try: 1075 logger.debug('Cleaning up pod at shutdown: %s', str(pod)) 1076 respone = self._try_kubernetes_expecting_gone(self._api('core').delete_namespaced_pod, pod.metadata.name, 1077 self.namespace, 1078 propagation_policy='Background') 1079 except ApiException as e: 1080 if e.status != 404: 1081 # Anything other than a 404 is weird here. 1082 logger.error("Exception when calling CoreV1Api->delete_namespaced_pod: %s" % e) 1083 1084 1085 def _getIssuedNonLocalBatchJobIDs(self): 1086 """ 1087 Get the issued batch job IDs that are not for local jobs. 1088 """ 1089 jobIDs = [] 1090 got_list = self._ourJobObject() 1091 for job in got_list: 1092 # Get the ID for each job 1093 jobIDs.append(self._getIDForOurJob(job)) 1094 return jobIDs 1095 1096 def getIssuedBatchJobIDs(self): 1097 # Make sure to send the local jobs also 1098 return self._getIssuedNonLocalBatchJobIDs() + list(self.getIssuedLocalJobIDs()) 1099 1100 def getRunningBatchJobIDs(self): 1101 # We need a dict from jobID (integer) to seconds it has been running 1102 secondsPerJob = dict() 1103 for job in self._ourJobObject(): 1104 # Grab the pod for each job 1105 pod = self._getPodForJob(job) 1106 1107 if pod is None: 1108 # Jobs whose pods are gone are not running 1109 continue 1110 1111 if pod.status.phase == 'Running': 1112 # The job's pod is running 1113 1114 # The only time we have handy is when the pod got assigned to a 1115 # kubelet, which is technically before it started running. 1116 runtime = (utc_now() - pod.status.start_time).total_seconds() 1117 1118 # Save it under the stringified job ID 1119 secondsPerJob[self._getIDForOurJob(job)] = runtime 1120 # Mix in the local jobs 1121 secondsPerJob.update(self.getRunningLocalJobIDs()) 1122 return secondsPerJob 1123 1124 def killBatchJobs(self, jobIDs): 1125 1126 # Kill all the ones that are local 1127 self.killLocalJobs(jobIDs) 1128 1129 # Clears workflow's jobs listed in jobIDs. 1130 1131 # First get the jobs we even issued non-locally 1132 issuedOnKubernetes = set(self._getIssuedNonLocalBatchJobIDs()) 1133 1134 for jobID in jobIDs: 1135 # For each job we are supposed to kill 1136 if jobID not in issuedOnKubernetes: 1137 # It never went to Kubernetes (or wasn't there when we just 1138 # looked), so we can't kill it on Kubernetes. 1139 continue 1140 # Work out what the job would be named 1141 jobName = self.jobPrefix + str(jobID) 1142 1143 # Delete the requested job in the foreground. 1144 # This doesn't block, but it does delete expeditiously. 1145 response = self._try_kubernetes(self._api('batch').delete_namespaced_job, jobName, 1146 self.namespace, 1147 propagation_policy='Foreground') 1148 logger.debug('Killed job by request: %s', jobName) 1149 1150 for jobID in jobIDs: 1151 # Now we need to wait for all the jobs we killed to be gone. 1152 1153 # Work out what the job would be named 1154 jobName = self.jobPrefix + str(jobID) 1155 1156 # Block until it doesn't exist 1157 self._waitForJobDeath(jobName) 1158 1159def executor(): 1160 """ 1161 Main function of the _toil_kubernetes_executor entrypoint. 1162 1163 Runs inside the Toil container. 1164 1165 Responsible for setting up the user script and running the command for the 1166 job (which may in turn invoke the Toil worker entrypoint). 1167 1168 """ 1169 1170 configure_root_logger() 1171 set_log_level("DEBUG") 1172 logger.debug("Starting executor") 1173 1174 # If we don't manage to run the child, what should our exit code be? 1175 exit_code = EXIT_STATUS_UNAVAILABLE_VALUE 1176 1177 if len(sys.argv) != 2: 1178 logger.error('Executor requires exactly one base64-encoded argument') 1179 sys.exit(exit_code) 1180 1181 # Take in a base64-encoded pickled dict as our first argument and decode it 1182 try: 1183 # Make sure to encode the text arguments to bytes before base 64 decoding 1184 job = pickle.loads(base64.b64decode(sys.argv[1].encode('utf-8'))) 1185 except: 1186 exc_info = sys.exc_info() 1187 logger.error('Exception while unpickling task: ', exc_info=exc_info) 1188 sys.exit(exit_code) 1189 1190 if 'environment' in job: 1191 # Adopt the job environment into the executor. 1192 # This lets us use things like TOIL_WORKDIR when figuring out how to talk to other executors. 1193 logger.debug('Adopting environment: %s', str(job['environment'].keys())) 1194 for var, value in job['environment'].items(): 1195 os.environ[var] = value 1196 1197 # Set JTRES_ROOT and other global state needed for resource 1198 # downloading/deployment to work. 1199 # TODO: Every worker downloads resources independently. 1200 # We should have a way to share a resource directory. 1201 logger.debug('Preparing system for resource download') 1202 Resource.prepareSystem() 1203 try: 1204 if 'userScript' in job: 1205 job['userScript'].register() 1206 1207 # Start the child process 1208 logger.debug("Invoking command: '%s'", job['command']) 1209 child = subprocess.Popen(job['command'], 1210 preexec_fn=lambda: os.setpgrp(), 1211 shell=True) 1212 1213 # Reproduce child's exit code 1214 exit_code = child.wait() 1215 1216 finally: 1217 logger.debug('Cleaning up resources') 1218 # TODO: Change resource system to use a shared resource directory for everyone. 1219 # Then move this into worker cleanup somehow 1220 Resource.cleanSystem() 1221 logger.debug('Shutting down') 1222 sys.exit(exit_code) 1223