1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements.  See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership.  The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License.  You may obtain a copy of the License at
8#
9#   http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied.  See the License for the
15# specific language governing permissions and limitations
16# under the License.
17
18import os
19import re
20import subprocess
21from io import StringIO
22
23from dotenv import dotenv_values
24from ruamel.yaml import YAML
25
26from ..utils.command import Command, default_bin
27from ..compat import _ensure_path
28
29
30def flatten(node, parents=None):
31    parents = list(parents or [])
32    if isinstance(node, str):
33        yield (node, parents)
34    elif isinstance(node, list):
35        for value in node:
36            yield from flatten(value, parents=parents)
37    elif isinstance(node, dict):
38        for key, value in node.items():
39            yield (key, parents)
40            yield from flatten(value, parents=parents + [key])
41    else:
42        raise TypeError(node)
43
44
45def _sanitize_command(cmd):
46    if isinstance(cmd, list):
47        cmd = " ".join(cmd)
48    return re.sub(r"\s+", " ", cmd)
49
50
51class UndefinedImage(Exception):
52    pass
53
54
55class ComposeConfig:
56
57    def __init__(self, config_path, dotenv_path, compose_bin, params=None):
58        config_path = _ensure_path(config_path)
59        if dotenv_path:
60            dotenv_path = _ensure_path(dotenv_path)
61        else:
62            dotenv_path = config_path.parent / '.env'
63        self._read_env(dotenv_path, params)
64        self._read_config(config_path, compose_bin)
65
66    def _read_env(self, dotenv_path, params):
67        """
68        Read .env and merge it with explicitly passed parameters.
69        """
70        self.dotenv = dotenv_values(str(dotenv_path))
71        if params is None:
72            self.params = {}
73        else:
74            self.params = {k: v for k, v in params.items() if k in self.dotenv}
75
76        # forward the process' environment variables
77        self.env = os.environ.copy()
78        # set the defaults from the dotenv files
79        self.env.update(self.dotenv)
80        # override the defaults passed as parameters
81        self.env.update(self.params)
82
83        # translate docker's architecture notation to a more widely used one
84        arch = self.env.get('ARCH', 'amd64')
85        arch_aliases = {
86            'amd64': 'x86_64',
87            'arm64v8': 'aarch64',
88            's390x': 's390x'
89        }
90        arch_short_aliases = {
91            'amd64': 'x64',
92            'arm64v8': 'arm64',
93            's390x': 's390x'
94        }
95        self.env['ARCH_ALIAS'] = arch_aliases.get(arch, arch)
96        self.env['ARCH_SHORT_ALIAS'] = arch_short_aliases.get(arch, arch)
97
98    def _read_config(self, config_path, compose_bin):
99        """
100        Validate and read the docker-compose.yml
101        """
102        yaml = YAML()
103        with config_path.open() as fp:
104            config = yaml.load(fp)
105
106        services = config['services'].keys()
107        self.hierarchy = dict(flatten(config.get('x-hierarchy', {})))
108        self.limit_presets = config.get('x-limit-presets', {})
109        self.with_gpus = config.get('x-with-gpus', [])
110        nodes = self.hierarchy.keys()
111        errors = []
112
113        for name in self.with_gpus:
114            if name not in services:
115                errors.append(
116                    'Service `{}` defined in `x-with-gpus` bot not in '
117                    '`services`'.format(name)
118                )
119        for name in nodes - services:
120            errors.append(
121                'Service `{}` is defined in `x-hierarchy` bot not in '
122                '`services`'.format(name)
123            )
124        for name in services - nodes:
125            errors.append(
126                'Service `{}` is defined in `services` but not in '
127                '`x-hierarchy`'.format(name)
128            )
129
130        # trigger docker-compose's own validation
131        compose = Command('docker-compose')
132        args = ['--file', str(config_path), 'config']
133        result = compose.run(*args, env=self.env, check=False,
134                             stderr=subprocess.PIPE, stdout=subprocess.PIPE)
135
136        if result.returncode != 0:
137            # strip the intro line of docker-compose errors
138            errors += result.stderr.decode().splitlines()
139
140        if errors:
141            msg = '\n'.join([' - {}'.format(msg) for msg in errors])
142            raise ValueError(
143                'Found errors with docker-compose:\n{}'.format(msg)
144            )
145
146        rendered_config = StringIO(result.stdout.decode())
147        self.path = config_path
148        self.config = yaml.load(rendered_config)
149
150    def get(self, service_name):
151        try:
152            service = self.config['services'][service_name]
153        except KeyError:
154            raise UndefinedImage(service_name)
155        service['name'] = service_name
156        service['need_gpu'] = service_name in self.with_gpus
157        service['ancestors'] = self.hierarchy[service_name]
158        return service
159
160    def __getitem__(self, service_name):
161        return self.get(service_name)
162
163
164class Docker(Command):
165
166    def __init__(self, docker_bin=None):
167        self.bin = default_bin(docker_bin, "docker")
168
169
170class DockerCompose(Command):
171
172    def __init__(self, config_path, dotenv_path=None, compose_bin=None,
173                 params=None):
174        compose_bin = default_bin(compose_bin, 'docker-compose')
175        self.config = ComposeConfig(config_path, dotenv_path, compose_bin,
176                                    params)
177        self.bin = compose_bin
178        self.pull_memory = set()
179
180    def clear_pull_memory(self):
181        self.pull_memory = set()
182
183    def _execute_compose(self, *args, **kwargs):
184        # execute as a docker compose command
185        try:
186            result = super().run('--file', str(self.config.path), *args,
187                                 env=self.config.env, **kwargs)
188            result.check_returncode()
189        except subprocess.CalledProcessError as e:
190            def formatdict(d, template):
191                return '\n'.join(
192                    template.format(k, v) for k, v in sorted(d.items())
193                )
194            msg = (
195                "`{cmd}` exited with a non-zero exit code {code}, see the "
196                "process log above.\n\nThe docker-compose command was "
197                "invoked with the following parameters:\n\nDefaults defined "
198                "in .env:\n{dotenv}\n\nArchery was called with:\n{params}"
199            )
200            raise RuntimeError(
201                msg.format(
202                    cmd=' '.join(e.cmd),
203                    code=e.returncode,
204                    dotenv=formatdict(self.config.dotenv, template='  {}: {}'),
205                    params=formatdict(
206                        self.config.params, template='  export {}={}'
207                    )
208                )
209            )
210
211    def _execute_docker(self, *args, **kwargs):
212        # execute as a plain docker cli command
213        try:
214            result = Docker().run(*args, **kwargs)
215            result.check_returncode()
216        except subprocess.CalledProcessError as e:
217            raise RuntimeError(
218                "{} exited with non-zero exit code {}".format(
219                    ' '.join(e.cmd), e.returncode
220                )
221            )
222
223    def pull(self, service_name, pull_leaf=True, using_docker=False):
224        def _pull(service):
225            args = ['pull']
226            if service['image'] in self.pull_memory:
227                return
228
229            if using_docker:
230                try:
231                    self._execute_docker(*args, service['image'])
232                except Exception as e:
233                    # better --ignore-pull-failures handling
234                    print(e)
235            else:
236                args.append('--ignore-pull-failures')
237                self._execute_compose(*args, service['name'])
238
239            self.pull_memory.add(service['image'])
240
241        service = self.config.get(service_name)
242        for ancestor in service['ancestors']:
243            _pull(self.config.get(ancestor))
244        if pull_leaf:
245            _pull(service)
246
247    def build(self, service_name, use_cache=True, use_leaf_cache=True,
248              using_docker=False, using_buildx=False, pull_parents=True):
249        def _build(service, use_cache):
250            if 'build' not in service:
251                # nothing to do
252                return
253
254            args = []
255            cache_from = list(service.get('build', {}).get('cache_from', []))
256            if pull_parents:
257                for image in cache_from:
258                    if image not in self.pull_memory:
259                        try:
260                            self._execute_docker('pull', image)
261                        except Exception as e:
262                            print(e)
263                        finally:
264                            self.pull_memory.add(image)
265
266            if not use_cache:
267                args.append('--no-cache')
268
269            # turn on inline build cache, this is a docker buildx feature
270            # used to bundle the image build cache to the pushed image manifest
271            # so the build cache can be reused across hosts, documented at
272            # https://github.com/docker/buildx#--cache-tonametypetypekeyvalue
273            if self.config.env.get('BUILDKIT_INLINE_CACHE') == '1':
274                args.extend(['--build-arg', 'BUILDKIT_INLINE_CACHE=1'])
275
276            if using_buildx:
277                for k, v in service['build'].get('args', {}).items():
278                    args.extend(['--build-arg', '{}={}'.format(k, v)])
279
280                if use_cache:
281                    cache_ref = '{}-cache'.format(service['image'])
282                    cache_from = 'type=registry,ref={}'.format(cache_ref)
283                    cache_to = (
284                        'type=registry,ref={},mode=max'.format(cache_ref)
285                    )
286                    args.extend([
287                        '--cache-from', cache_from,
288                        '--cache-to', cache_to,
289                    ])
290
291                args.extend([
292                    '--output', 'type=docker',
293                    '-f', service['build']['dockerfile'],
294                    '-t', service['image'],
295                    service['build'].get('context', '.')
296                ])
297                self._execute_docker("buildx", "build", *args)
298            elif using_docker:
299                # better for caching
300                for k, v in service['build'].get('args', {}).items():
301                    args.extend(['--build-arg', '{}={}'.format(k, v)])
302                for img in cache_from:
303                    args.append('--cache-from="{}"'.format(img))
304                args.extend([
305                    '-f', service['build']['dockerfile'],
306                    '-t', service['image'],
307                    service['build'].get('context', '.')
308                ])
309                self._execute_docker("build", *args)
310            else:
311                self._execute_compose("build", *args, service['name'])
312
313        service = self.config.get(service_name)
314        # build ancestor services
315        for ancestor in service['ancestors']:
316            _build(self.config.get(ancestor), use_cache=use_cache)
317        # build the leaf/target service
318        _build(service, use_cache=use_cache and use_leaf_cache)
319
320    def run(self, service_name, command=None, *, env=None, volumes=None,
321            user=None, using_docker=False, resource_limit=None):
322        service = self.config.get(service_name)
323
324        args = []
325        if user is not None:
326            args.extend(['-u', user])
327
328        if env is not None:
329            for k, v in env.items():
330                args.extend(['-e', '{}={}'.format(k, v)])
331
332        if volumes is not None:
333            for volume in volumes:
334                args.extend(['--volume', volume])
335
336        if using_docker or service['need_gpu'] or resource_limit:
337            # use gpus, requires docker>=19.03
338            if service['need_gpu']:
339                args.extend(['--gpus', 'all'])
340
341            if service.get('shm_size'):
342                args.extend(['--shm-size', service['shm_size']])
343
344            # append env variables from the compose conf
345            for k, v in service.get('environment', {}).items():
346                args.extend(['-e', '{}={}'.format(k, v)])
347
348            # append volumes from the compose conf
349            for v in service.get('volumes', []):
350                if not isinstance(v, str):
351                    # if not the compact string volume definition
352                    v = "{}:{}".format(v['source'], v['target'])
353                args.extend(['-v', v])
354
355            # infer whether an interactive shell is desired or not
356            if command in ['cmd.exe', 'bash', 'sh', 'powershell']:
357                args.append('-it')
358
359            if resource_limit:
360                limits = self.config.limit_presets.get(resource_limit)
361                if not limits:
362                    raise ValueError(
363                        f"Unknown resource limit preset '{resource_limit}'")
364                cpuset = limits.get('cpuset_cpus', [])
365                if cpuset:
366                    args.append(f'--cpuset-cpus={",".join(map(str, cpuset))}')
367                memory = limits.get('memory')
368                if memory:
369                    args.append(f'--memory={memory}')
370                    args.append(f'--memory-swap={memory}')
371
372            # get the actual docker image name instead of the compose service
373            # name which we refer as image in general
374            args.append(service['image'])
375
376            # add command from compose if it wasn't overridden
377            if command is not None:
378                args.append(command)
379            else:
380                # replace whitespaces from the preformatted compose command
381                cmd = _sanitize_command(service.get('command', ''))
382                if cmd:
383                    args.append(cmd)
384
385            # execute as a plain docker cli command
386            self._execute_docker('run', '--rm', *args)
387        else:
388            # execute as a docker-compose command
389            args.append(service_name)
390            if command is not None:
391                args.append(command)
392            self._execute_compose('run', '--rm', *args)
393
394    def push(self, service_name, user=None, password=None, using_docker=False):
395        def _push(service):
396            if using_docker:
397                return self._execute_docker('push', service['image'])
398            else:
399                return self._execute_compose('push', service['name'])
400
401        if user is not None:
402            try:
403                # TODO(kszucs): have an option for a prompt
404                self._execute_docker('login', '-u', user, '-p', password)
405            except subprocess.CalledProcessError:
406                # hide credentials
407                msg = ('Failed to push `{}`, check the passed credentials'
408                       .format(service_name))
409                raise RuntimeError(msg) from None
410
411        service = self.config.get(service_name)
412        for ancestor in service['ancestors']:
413            _push(self.config.get(ancestor))
414        _push(service)
415
416    def images(self):
417        return sorted(self.config.hierarchy.keys())
418