1# Copyright (C) 2015-2021 Regents of the University of California
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14import base64
15import logging
16import os
17import re
18import struct
19from shlex import quote
20
21import requests
22
23import docker
24from docker.errors import (ContainerError,
25                           ImageNotFound,
26                           NotFound,
27                           create_api_error_from_http_exception)
28from docker.utils.socket import consume_socket_output, demux_adaptor
29
30logger = logging.getLogger(__name__)
31
32FORGO = 0
33STOP = 1
34RM = 2
35
36
37def dockerCheckOutput(*args, **kwargs):
38    raise RuntimeError("dockerCheckOutput() using subprocess.check_output() has been removed, "
39                       "please switch to apiDockerCall().")
40
41
42def dockerCall(*args, **kwargs):
43    raise RuntimeError("dockerCall() using subprocess.check_output() has been removed, "
44                       "please switch to apiDockerCall().")
45
46
47def subprocessDockerCall(*args, **kwargs):
48    raise RuntimeError("subprocessDockerCall() has been removed, "
49                       "please switch to apiDockerCall().")
50
51
52def apiDockerCall(job,
53                  image,
54                  parameters=None,
55                  deferParam=None,
56                  volumes=None,
57                  working_dir=None,
58                  containerName=None,
59                  entrypoint=None,
60                  detach=False,
61                  log_config=None,
62                  auto_remove=None,
63                  remove=False,
64                  user=None,
65                  environment=None,
66                  stdout=None,
67                  stderr=False,
68                  stream=False,
69                  demux=False,
70                  streamfile=None,
71                  timeout=365 * 24 * 60 * 60,
72                  **kwargs):
73    """
74    A toil wrapper for the python docker API.
75
76    Docker API Docs: https://docker-py.readthedocs.io/en/stable/index.html
77    Docker API Code: https://github.com/docker/docker-py
78
79    This implements docker's python API within toil so that calls are run as
80    jobs, with the intention that failed/orphaned docker jobs be handled
81    appropriately.
82
83    Example of using dockerCall in toil to index a FASTA file with SAMtools:
84    def toil_job(job):
85        working_dir = job.fileStore.getLocalTempDir()
86        path = job.fileStore.readGlobalFile(ref_id,
87                                          os.path.join(working_dir, 'ref.fasta')
88        parameters = ['faidx', path]
89        apiDockerCall(job,
90                      image='quay.io/ucgc_cgl/samtools:latest',
91                      working_dir=working_dir,
92                      parameters=parameters)
93
94    Note that when run with detach=False, or with detach=True and stdout=True
95    or stderr=True, this is a blocking call. When run with detach=True and
96    without output capture, the container is started and returned without
97    waiting for it to finish.
98
99    :param toil.Job.job job: The Job instance for the calling function.
100    :param str image: Name of the Docker image to be used.
101                     (e.g. 'quay.io/ucsc_cgl/samtools:latest')
102    :param list[str] parameters: A list of string elements.  If there are
103                                 multiple elements, these will be joined with
104                                 spaces.  This handling of multiple elements
105                                 provides backwards compatibility with previous
106                                 versions which called docker using
107                                 subprocess.check_call().
108                                 **If list of lists: list[list[str]], then treat
109                                 as successive commands chained with pipe.
110    :param str working_dir: The working directory.
111    :param int deferParam: Action to take on the container upon job completion.
112           FORGO (0) leaves the container untouched and running.
113           STOP (1) Sends SIGTERM, then SIGKILL if necessary to the container.
114           RM (2) Immediately send SIGKILL to the container. This is the default
115           behavior if deferParam is set to None.
116    :param str name: The name/ID of the container.
117    :param str entrypoint: Prepends commands sent to the container.  See:
118                      https://docker-py.readthedocs.io/en/stable/containers.html
119    :param bool detach: Run the container in detached mode. (equivalent to '-d')
120    :param bool stdout: Return logs from STDOUT when detach=False (default: True).
121                        Block and capture stdout to a file when detach=True
122                        (default: False). Output capture defaults to output.log,
123                        and can be specified with the "streamfile" kwarg.
124    :param bool stderr: Return logs from STDERR when detach=False (default: False).
125                        Block and capture stderr to a file when detach=True
126                        (default: False). Output capture defaults to output.log,
127                        and can be specified with the "streamfile" kwarg.
128    :param bool stream: If True and detach=False, return a log generator instead
129                        of a string. Ignored if detach=True. (default: False).
130    :param bool demux: Similar to `demux` in container.exec_run(). If True and
131                       detach=False, returns a tuple of (stdout, stderr). If
132                       stream=True, returns a log generator with tuples of
133                       (stdout, stderr). Ignored if detach=True. (default: False).
134    :param str streamfile: Collect container output to this file if detach=True and
135                        stderr and/or stdout are True. Defaults to "output.log".
136    :param dict log_config: Specify the logs to return from the container.  See:
137                      https://docker-py.readthedocs.io/en/stable/containers.html
138    :param bool remove: Remove the container on exit or not.
139    :param str user: The container will be run with the privileges of
140                     the user specified.  Can be an actual name, such
141                     as 'root' or 'lifeisaboutfishtacos', or it can be
142                     the uid or gid of the user ('0' is root; '1000' is
143                     an example of a less privileged uid or gid), or a
144                     complement of the uid:gid (RECOMMENDED), such as
145                     '0:0' (root user : root group) or '1000:1000'
146                     (some other user : some other user group).
147    :param environment: Allows one to set environment variables inside of the
148                        container, such as:
149    :param int timeout: Use the given timeout in seconds for interactions with
150                        the Docker daemon. Note that the underlying docker module is
151                        not always able to abort ongoing reads and writes in order
152                        to respect the timeout. Defaults to 1 year (i.e. wait
153                        essentially indefinitely).
154    :param kwargs: Additional keyword arguments supplied to the docker API's
155                   run command.  The list is 75 keywords total, for examples
156                   and full documentation see:
157                   https://docker-py.readthedocs.io/en/stable/containers.html
158
159    :returns: Returns the standard output/standard error text, as requested, when
160              detach=False. Returns the underlying
161              docker.models.containers.Container object from the Docker API when
162              detach=True.
163    """
164
165    # make certain that files have the correct permissions
166    thisUser = os.getuid()
167    thisGroup = os.getgid()
168    if user is None:
169        user = str(thisUser) + ":" + str(thisGroup)
170
171    if containerName is None:
172        containerName = getContainerName(job)
173
174    if working_dir is None:
175        working_dir = os.getcwd()
176
177    if volumes is None:
178        volumes = {working_dir: {'bind': '/data', 'mode': 'rw'}}
179
180    for volume in volumes:
181        if not os.path.exists(volume):
182            os.makedirs(volume, exist_ok=True)
183
184    if parameters is None:
185        parameters = []
186
187    # If 'parameters' is a list of lists, treat each list as a separate command
188    # and chain with pipes.
189    if len(parameters) > 0 and type(parameters[0]) is list:
190        if entrypoint is None:
191            entrypoint = ['/bin/bash', '-c']
192        chain_params = \
193            [' '.join((quote(arg) for arg in command)) \
194             for command in parameters]
195        command = ' | '.join(chain_params)
196        pipe_prefix = "set -eo pipefail && "
197        command = [pipe_prefix + command]
198        logger.debug("Calling docker with: " + repr(command))
199
200    # If 'parameters' is a normal list, join all elements into a single string
201    # element, quoting and escaping each element.
202    # Example: ['echo','the Oread'] becomes: ["echo 'the Oread'"]
203    # Note that this is still a list, and the docker API prefers this as best
204    # practice:
205    # http://docker-py.readthedocs.io/en/stable/containers.html
206    elif len(parameters) > 0 and type(parameters) is list:
207        command = ' '.join((quote(arg) for arg in parameters))
208        logger.debug("Calling docker with: " + repr(command))
209
210    # If the 'parameters' lists are empty, they are respecified as None, which
211    # tells the API to simply create and run the container
212    else:
213        entrypoint = None
214        command = None
215
216    working_dir = os.path.abspath(working_dir)
217
218    # Ensure the user has passed a valid value for deferParam
219    assert deferParam in (None, FORGO, STOP, RM), \
220        'Please provide a valid value for deferParam.'
221
222    client = docker.from_env(version='auto', timeout=timeout)
223
224    if deferParam is None:
225        deferParam = RM
226
227    if deferParam == STOP:
228        job.defer(dockerStop, containerName)
229
230    if deferParam == FORGO:
231        # Leave the container untouched and running
232        pass
233    elif deferParam == RM:
234        job.defer(dockerKill, containerName, remove=True)
235    elif remove:
236        job.defer(dockerKill, containerName, remove=True)
237
238    if auto_remove is None:
239        auto_remove = remove
240
241    try:
242        if detach is False:
243            # When detach is False, this returns stdout normally:
244            # >>> client.containers.run("ubuntu:latest", "echo hello world")
245            # 'hello world\n'
246            if stdout is None:
247                stdout = True
248            out = client.containers.run(image=image,
249                                        command=command,
250                                        working_dir=working_dir,
251                                        entrypoint=entrypoint,
252                                        name=containerName,
253                                        detach=False,
254                                        volumes=volumes,
255                                        auto_remove=auto_remove,
256                                        stdout=stdout,
257                                        stderr=stderr,
258                                        # to get the generator if demux=True
259                                        stream=stream or demux,
260                                        remove=remove,
261                                        log_config=log_config,
262                                        user=user,
263                                        environment=environment,
264                                        **kwargs)
265
266            if demux is False:
267                return out
268
269            # If demux is True (i.e.: we want STDOUT and STDERR separated), we need to decode
270            # the raw response from the docker API and preserve the stream type this time.
271            response = out._response
272            gen = (demux_adaptor(*frame) for frame in _multiplexed_response_stream_helper(response))
273
274            if stream:
275                return gen
276            else:
277                return consume_socket_output(frames=gen, demux=True)
278
279        else:
280            if (stdout or stderr) and log_config is None:
281                logger.warning('stdout or stderr specified, but log_config is not set.  '
282                               'Defaulting to "journald".')
283                log_config = dict(type='journald')
284
285            if stdout is None:
286                stdout = False
287
288            # When detach is True, this returns a container object:
289            # >>> client.containers.run("bfirsh/reticulate-splines", detach=True)
290            # <Container '45e6d2de7c54'>
291            container = client.containers.run(image=image,
292                                              command=command,
293                                              working_dir=working_dir,
294                                              entrypoint=entrypoint,
295                                              name=containerName,
296                                              detach=True,
297                                              volumes=volumes,
298                                              auto_remove=auto_remove,
299                                              stdout=stdout,
300                                              stderr=stderr,
301                                              stream=stream,
302                                              remove=remove,
303                                              log_config=log_config,
304                                              user=user,
305                                              environment=environment,
306                                              **kwargs)
307            if stdout or stderr:
308                if streamfile is None:
309                    streamfile = 'output.log'
310                with open(streamfile, 'wb') as f:
311                    # stream=True makes this loop blocking; we will loop until
312                    # the container stops and there is no more output.
313                    for line in container.logs(stdout=stdout, stderr=stderr, stream=True):
314                        f.write(line.encode() if isinstance(line, str) else line)
315
316            # If we didn't capture output, the caller will need to .wait() on
317            # the container to know when it is done. Even if we did capture
318            # output, the caller needs the container to get at the exit code.
319            return container
320
321    except ContainerError:
322        logger.error("Docker had non-zero exit.  Check your command: " + repr(command))
323        raise
324    except ImageNotFound:
325        logger.error("Docker image not found.")
326        raise
327    except requests.exceptions.HTTPError as e:
328        logger.error("The server returned an error.")
329        raise create_api_error_from_http_exception(e)
330
331
332def dockerKill(container_name: str,
333               gentleKill: bool = False,
334               remove: bool = False,
335               timeout: int = 365 * 24 * 60 * 60) -> None:
336    """
337    Immediately kills a container.  Equivalent to "docker kill":
338    https://docs.docker.com/engine/reference/commandline/kill/
339
340    :param container_name: Name of the container being killed.
341    :param gentleKill: If True, trigger a graceful shutdown.
342    :param remove: If True, remove the container after it exits.
343    :param int timeout: Use the given timeout in seconds for interactions with
344                        the Docker daemon. Note that the underlying docker module is
345                        not always able to abort ongoing reads and writes in order
346                        to respect the timeout. Defaults to 1 year (i.e. wait
347                        essentially indefinitely).
348    """
349    client = docker.from_env(version='auto', timeout=timeout)
350    try:
351        this_container = client.containers.get(container_name)
352        while this_container.status == 'running':
353            if gentleKill is False:
354                client.containers.get(container_name).kill()
355            else:
356                client.containers.get(container_name).stop()
357            this_container = client.containers.get(container_name)
358        if remove:
359            this_container.remove()
360    except NotFound:
361        logger.debug(f"Attempted to stop container ({container_name}), but container != exist.")
362    except requests.exceptions.HTTPError as e:
363        logger.debug(f"Attempted to stop container ({container_name}), but server gave an error:")
364        raise create_api_error_from_http_exception(e)
365
366
367def dockerStop(container_name: str, remove: bool = False) -> None:
368    """
369    Gracefully kills a container.  Equivalent to "docker stop":
370    https://docs.docker.com/engine/reference/commandline/stop/
371
372    :param container_name: Name of the container being stopped.
373    :param remove: If True, remove the container after it exits.
374    """
375    dockerKill(container_name, gentleKill=True, remove=remove)
376
377
378def containerIsRunning(container_name: str, timeout: int = 365 * 24 * 60 * 60):
379    """
380    Checks whether the container is running or not.
381
382    :param container_name: Name of the container being checked.
383    :param int timeout: Use the given timeout in seconds for interactions with
384                        the Docker daemon. Note that the underlying docker module is
385                        not always able to abort ongoing reads and writes in order
386                        to respect the timeout. Defaults to 1 year (i.e. wait
387                        essentially indefinitely).
388    :returns: True if status is 'running', False if status is anything else,
389    and None if the container does not exist.
390    """
391    client = docker.from_env(version='auto', timeout=timeout)
392    try:
393        this_container = client.containers.get(container_name)
394        if this_container.status == 'running':
395            return True
396        else:
397            # this_container.status == 'exited', 'restarting', or 'paused'
398            return False
399    except NotFound:
400        return None
401    except requests.exceptions.HTTPError as e:
402        logger.debug("Server error attempting to call container: ",
403                     container_name)
404        raise create_api_error_from_http_exception(e)
405
406
407def getContainerName(job):
408    """
409    Create a random string including the job name, and return it. Name will
410    match [a-zA-Z0-9][a-zA-Z0-9_.-]
411    """
412    parts = ['toil', str(job.description), base64.b64encode(os.urandom(9), b'-_').decode('utf-8')]
413    name = re.sub('[^a-zA-Z0-9_.-]', '', '--'.join(parts))
414    return name
415
416
417def _multiplexed_response_stream_helper(response):
418    """
419    A generator of multiplexed data blocks coming from a response stream modified from:
420    https://github.com/docker/docker-py/blob/4.3.1-release/docker/api/client.py#L370
421
422    :param response: requests.Response
423    :return: a generator with tuples of (stream_type, data)
424    """
425    while True:
426        header = response.raw.read(8)
427        if not header:
428            break
429        # header is 8 bytes with format: {STREAM_TYPE, 0, 0, 0, SIZE1, SIZE2, SIZE3, SIZE4}
430        # protocol: https://docs.docker.com/engine/api/v1.24/#attach-to-a-container
431        stream_type, length = struct.unpack('>BxxxL', header)
432        if not length:
433            continue
434        data = response.raw.read(length)
435        if not data:
436            break
437        yield stream_type, data
438