1# Copyright (C) 2015-2021 Regents of the University of California 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14import base64 15import logging 16import os 17import re 18import struct 19from shlex import quote 20 21import requests 22 23import docker 24from docker.errors import (ContainerError, 25 ImageNotFound, 26 NotFound, 27 create_api_error_from_http_exception) 28from docker.utils.socket import consume_socket_output, demux_adaptor 29 30logger = logging.getLogger(__name__) 31 32FORGO = 0 33STOP = 1 34RM = 2 35 36 37def dockerCheckOutput(*args, **kwargs): 38 raise RuntimeError("dockerCheckOutput() using subprocess.check_output() has been removed, " 39 "please switch to apiDockerCall().") 40 41 42def dockerCall(*args, **kwargs): 43 raise RuntimeError("dockerCall() using subprocess.check_output() has been removed, " 44 "please switch to apiDockerCall().") 45 46 47def subprocessDockerCall(*args, **kwargs): 48 raise RuntimeError("subprocessDockerCall() has been removed, " 49 "please switch to apiDockerCall().") 50 51 52def apiDockerCall(job, 53 image, 54 parameters=None, 55 deferParam=None, 56 volumes=None, 57 working_dir=None, 58 containerName=None, 59 entrypoint=None, 60 detach=False, 61 log_config=None, 62 auto_remove=None, 63 remove=False, 64 user=None, 65 environment=None, 66 stdout=None, 67 stderr=False, 68 stream=False, 69 demux=False, 70 streamfile=None, 71 timeout=365 * 24 * 60 * 60, 72 **kwargs): 73 """ 74 A toil wrapper for the python docker API. 75 76 Docker API Docs: https://docker-py.readthedocs.io/en/stable/index.html 77 Docker API Code: https://github.com/docker/docker-py 78 79 This implements docker's python API within toil so that calls are run as 80 jobs, with the intention that failed/orphaned docker jobs be handled 81 appropriately. 82 83 Example of using dockerCall in toil to index a FASTA file with SAMtools: 84 def toil_job(job): 85 working_dir = job.fileStore.getLocalTempDir() 86 path = job.fileStore.readGlobalFile(ref_id, 87 os.path.join(working_dir, 'ref.fasta') 88 parameters = ['faidx', path] 89 apiDockerCall(job, 90 image='quay.io/ucgc_cgl/samtools:latest', 91 working_dir=working_dir, 92 parameters=parameters) 93 94 Note that when run with detach=False, or with detach=True and stdout=True 95 or stderr=True, this is a blocking call. When run with detach=True and 96 without output capture, the container is started and returned without 97 waiting for it to finish. 98 99 :param toil.Job.job job: The Job instance for the calling function. 100 :param str image: Name of the Docker image to be used. 101 (e.g. 'quay.io/ucsc_cgl/samtools:latest') 102 :param list[str] parameters: A list of string elements. If there are 103 multiple elements, these will be joined with 104 spaces. This handling of multiple elements 105 provides backwards compatibility with previous 106 versions which called docker using 107 subprocess.check_call(). 108 **If list of lists: list[list[str]], then treat 109 as successive commands chained with pipe. 110 :param str working_dir: The working directory. 111 :param int deferParam: Action to take on the container upon job completion. 112 FORGO (0) leaves the container untouched and running. 113 STOP (1) Sends SIGTERM, then SIGKILL if necessary to the container. 114 RM (2) Immediately send SIGKILL to the container. This is the default 115 behavior if deferParam is set to None. 116 :param str name: The name/ID of the container. 117 :param str entrypoint: Prepends commands sent to the container. See: 118 https://docker-py.readthedocs.io/en/stable/containers.html 119 :param bool detach: Run the container in detached mode. (equivalent to '-d') 120 :param bool stdout: Return logs from STDOUT when detach=False (default: True). 121 Block and capture stdout to a file when detach=True 122 (default: False). Output capture defaults to output.log, 123 and can be specified with the "streamfile" kwarg. 124 :param bool stderr: Return logs from STDERR when detach=False (default: False). 125 Block and capture stderr to a file when detach=True 126 (default: False). Output capture defaults to output.log, 127 and can be specified with the "streamfile" kwarg. 128 :param bool stream: If True and detach=False, return a log generator instead 129 of a string. Ignored if detach=True. (default: False). 130 :param bool demux: Similar to `demux` in container.exec_run(). If True and 131 detach=False, returns a tuple of (stdout, stderr). If 132 stream=True, returns a log generator with tuples of 133 (stdout, stderr). Ignored if detach=True. (default: False). 134 :param str streamfile: Collect container output to this file if detach=True and 135 stderr and/or stdout are True. Defaults to "output.log". 136 :param dict log_config: Specify the logs to return from the container. See: 137 https://docker-py.readthedocs.io/en/stable/containers.html 138 :param bool remove: Remove the container on exit or not. 139 :param str user: The container will be run with the privileges of 140 the user specified. Can be an actual name, such 141 as 'root' or 'lifeisaboutfishtacos', or it can be 142 the uid or gid of the user ('0' is root; '1000' is 143 an example of a less privileged uid or gid), or a 144 complement of the uid:gid (RECOMMENDED), such as 145 '0:0' (root user : root group) or '1000:1000' 146 (some other user : some other user group). 147 :param environment: Allows one to set environment variables inside of the 148 container, such as: 149 :param int timeout: Use the given timeout in seconds for interactions with 150 the Docker daemon. Note that the underlying docker module is 151 not always able to abort ongoing reads and writes in order 152 to respect the timeout. Defaults to 1 year (i.e. wait 153 essentially indefinitely). 154 :param kwargs: Additional keyword arguments supplied to the docker API's 155 run command. The list is 75 keywords total, for examples 156 and full documentation see: 157 https://docker-py.readthedocs.io/en/stable/containers.html 158 159 :returns: Returns the standard output/standard error text, as requested, when 160 detach=False. Returns the underlying 161 docker.models.containers.Container object from the Docker API when 162 detach=True. 163 """ 164 165 # make certain that files have the correct permissions 166 thisUser = os.getuid() 167 thisGroup = os.getgid() 168 if user is None: 169 user = str(thisUser) + ":" + str(thisGroup) 170 171 if containerName is None: 172 containerName = getContainerName(job) 173 174 if working_dir is None: 175 working_dir = os.getcwd() 176 177 if volumes is None: 178 volumes = {working_dir: {'bind': '/data', 'mode': 'rw'}} 179 180 for volume in volumes: 181 if not os.path.exists(volume): 182 os.makedirs(volume, exist_ok=True) 183 184 if parameters is None: 185 parameters = [] 186 187 # If 'parameters' is a list of lists, treat each list as a separate command 188 # and chain with pipes. 189 if len(parameters) > 0 and type(parameters[0]) is list: 190 if entrypoint is None: 191 entrypoint = ['/bin/bash', '-c'] 192 chain_params = \ 193 [' '.join((quote(arg) for arg in command)) \ 194 for command in parameters] 195 command = ' | '.join(chain_params) 196 pipe_prefix = "set -eo pipefail && " 197 command = [pipe_prefix + command] 198 logger.debug("Calling docker with: " + repr(command)) 199 200 # If 'parameters' is a normal list, join all elements into a single string 201 # element, quoting and escaping each element. 202 # Example: ['echo','the Oread'] becomes: ["echo 'the Oread'"] 203 # Note that this is still a list, and the docker API prefers this as best 204 # practice: 205 # http://docker-py.readthedocs.io/en/stable/containers.html 206 elif len(parameters) > 0 and type(parameters) is list: 207 command = ' '.join((quote(arg) for arg in parameters)) 208 logger.debug("Calling docker with: " + repr(command)) 209 210 # If the 'parameters' lists are empty, they are respecified as None, which 211 # tells the API to simply create and run the container 212 else: 213 entrypoint = None 214 command = None 215 216 working_dir = os.path.abspath(working_dir) 217 218 # Ensure the user has passed a valid value for deferParam 219 assert deferParam in (None, FORGO, STOP, RM), \ 220 'Please provide a valid value for deferParam.' 221 222 client = docker.from_env(version='auto', timeout=timeout) 223 224 if deferParam is None: 225 deferParam = RM 226 227 if deferParam == STOP: 228 job.defer(dockerStop, containerName) 229 230 if deferParam == FORGO: 231 # Leave the container untouched and running 232 pass 233 elif deferParam == RM: 234 job.defer(dockerKill, containerName, remove=True) 235 elif remove: 236 job.defer(dockerKill, containerName, remove=True) 237 238 if auto_remove is None: 239 auto_remove = remove 240 241 try: 242 if detach is False: 243 # When detach is False, this returns stdout normally: 244 # >>> client.containers.run("ubuntu:latest", "echo hello world") 245 # 'hello world\n' 246 if stdout is None: 247 stdout = True 248 out = client.containers.run(image=image, 249 command=command, 250 working_dir=working_dir, 251 entrypoint=entrypoint, 252 name=containerName, 253 detach=False, 254 volumes=volumes, 255 auto_remove=auto_remove, 256 stdout=stdout, 257 stderr=stderr, 258 # to get the generator if demux=True 259 stream=stream or demux, 260 remove=remove, 261 log_config=log_config, 262 user=user, 263 environment=environment, 264 **kwargs) 265 266 if demux is False: 267 return out 268 269 # If demux is True (i.e.: we want STDOUT and STDERR separated), we need to decode 270 # the raw response from the docker API and preserve the stream type this time. 271 response = out._response 272 gen = (demux_adaptor(*frame) for frame in _multiplexed_response_stream_helper(response)) 273 274 if stream: 275 return gen 276 else: 277 return consume_socket_output(frames=gen, demux=True) 278 279 else: 280 if (stdout or stderr) and log_config is None: 281 logger.warning('stdout or stderr specified, but log_config is not set. ' 282 'Defaulting to "journald".') 283 log_config = dict(type='journald') 284 285 if stdout is None: 286 stdout = False 287 288 # When detach is True, this returns a container object: 289 # >>> client.containers.run("bfirsh/reticulate-splines", detach=True) 290 # <Container '45e6d2de7c54'> 291 container = client.containers.run(image=image, 292 command=command, 293 working_dir=working_dir, 294 entrypoint=entrypoint, 295 name=containerName, 296 detach=True, 297 volumes=volumes, 298 auto_remove=auto_remove, 299 stdout=stdout, 300 stderr=stderr, 301 stream=stream, 302 remove=remove, 303 log_config=log_config, 304 user=user, 305 environment=environment, 306 **kwargs) 307 if stdout or stderr: 308 if streamfile is None: 309 streamfile = 'output.log' 310 with open(streamfile, 'wb') as f: 311 # stream=True makes this loop blocking; we will loop until 312 # the container stops and there is no more output. 313 for line in container.logs(stdout=stdout, stderr=stderr, stream=True): 314 f.write(line.encode() if isinstance(line, str) else line) 315 316 # If we didn't capture output, the caller will need to .wait() on 317 # the container to know when it is done. Even if we did capture 318 # output, the caller needs the container to get at the exit code. 319 return container 320 321 except ContainerError: 322 logger.error("Docker had non-zero exit. Check your command: " + repr(command)) 323 raise 324 except ImageNotFound: 325 logger.error("Docker image not found.") 326 raise 327 except requests.exceptions.HTTPError as e: 328 logger.error("The server returned an error.") 329 raise create_api_error_from_http_exception(e) 330 331 332def dockerKill(container_name: str, 333 gentleKill: bool = False, 334 remove: bool = False, 335 timeout: int = 365 * 24 * 60 * 60) -> None: 336 """ 337 Immediately kills a container. Equivalent to "docker kill": 338 https://docs.docker.com/engine/reference/commandline/kill/ 339 340 :param container_name: Name of the container being killed. 341 :param gentleKill: If True, trigger a graceful shutdown. 342 :param remove: If True, remove the container after it exits. 343 :param int timeout: Use the given timeout in seconds for interactions with 344 the Docker daemon. Note that the underlying docker module is 345 not always able to abort ongoing reads and writes in order 346 to respect the timeout. Defaults to 1 year (i.e. wait 347 essentially indefinitely). 348 """ 349 client = docker.from_env(version='auto', timeout=timeout) 350 try: 351 this_container = client.containers.get(container_name) 352 while this_container.status == 'running': 353 if gentleKill is False: 354 client.containers.get(container_name).kill() 355 else: 356 client.containers.get(container_name).stop() 357 this_container = client.containers.get(container_name) 358 if remove: 359 this_container.remove() 360 except NotFound: 361 logger.debug(f"Attempted to stop container ({container_name}), but container != exist.") 362 except requests.exceptions.HTTPError as e: 363 logger.debug(f"Attempted to stop container ({container_name}), but server gave an error:") 364 raise create_api_error_from_http_exception(e) 365 366 367def dockerStop(container_name: str, remove: bool = False) -> None: 368 """ 369 Gracefully kills a container. Equivalent to "docker stop": 370 https://docs.docker.com/engine/reference/commandline/stop/ 371 372 :param container_name: Name of the container being stopped. 373 :param remove: If True, remove the container after it exits. 374 """ 375 dockerKill(container_name, gentleKill=True, remove=remove) 376 377 378def containerIsRunning(container_name: str, timeout: int = 365 * 24 * 60 * 60): 379 """ 380 Checks whether the container is running or not. 381 382 :param container_name: Name of the container being checked. 383 :param int timeout: Use the given timeout in seconds for interactions with 384 the Docker daemon. Note that the underlying docker module is 385 not always able to abort ongoing reads and writes in order 386 to respect the timeout. Defaults to 1 year (i.e. wait 387 essentially indefinitely). 388 :returns: True if status is 'running', False if status is anything else, 389 and None if the container does not exist. 390 """ 391 client = docker.from_env(version='auto', timeout=timeout) 392 try: 393 this_container = client.containers.get(container_name) 394 if this_container.status == 'running': 395 return True 396 else: 397 # this_container.status == 'exited', 'restarting', or 'paused' 398 return False 399 except NotFound: 400 return None 401 except requests.exceptions.HTTPError as e: 402 logger.debug("Server error attempting to call container: ", 403 container_name) 404 raise create_api_error_from_http_exception(e) 405 406 407def getContainerName(job): 408 """ 409 Create a random string including the job name, and return it. Name will 410 match [a-zA-Z0-9][a-zA-Z0-9_.-] 411 """ 412 parts = ['toil', str(job.description), base64.b64encode(os.urandom(9), b'-_').decode('utf-8')] 413 name = re.sub('[^a-zA-Z0-9_.-]', '', '--'.join(parts)) 414 return name 415 416 417def _multiplexed_response_stream_helper(response): 418 """ 419 A generator of multiplexed data blocks coming from a response stream modified from: 420 https://github.com/docker/docker-py/blob/4.3.1-release/docker/api/client.py#L370 421 422 :param response: requests.Response 423 :return: a generator with tuples of (stream_type, data) 424 """ 425 while True: 426 header = response.raw.read(8) 427 if not header: 428 break 429 # header is 8 bytes with format: {STREAM_TYPE, 0, 0, 0, SIZE1, SIZE2, SIZE3, SIZE4} 430 # protocol: https://docs.docker.com/engine/api/v1.24/#attach-to-a-container 431 stream_type, length = struct.unpack('>BxxxL', header) 432 if not length: 433 continue 434 data = response.raw.read(length) 435 if not data: 436 break 437 yield stream_type, data 438