1# Licensed to the Apache Software Foundation (ASF) under one 2# or more contributor license agreements. See the NOTICE file 3# distributed with this work for additional information 4# regarding copyright ownership. The ASF licenses this file 5# to you under the Apache License, Version 2.0 (the 6# "License"); you may not use this file except in compliance 7# with the License. You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, 12# software distributed under the License is distributed on an 13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14# KIND, either express or implied. See the License for the 15# specific language governing permissions and limitations 16# under the License. 17 18import os 19import re 20import subprocess 21from io import StringIO 22 23from dotenv import dotenv_values 24from ruamel.yaml import YAML 25 26from ..utils.command import Command, default_bin 27from ..compat import _ensure_path 28 29 30def flatten(node, parents=None): 31 parents = list(parents or []) 32 if isinstance(node, str): 33 yield (node, parents) 34 elif isinstance(node, list): 35 for value in node: 36 yield from flatten(value, parents=parents) 37 elif isinstance(node, dict): 38 for key, value in node.items(): 39 yield (key, parents) 40 yield from flatten(value, parents=parents + [key]) 41 else: 42 raise TypeError(node) 43 44 45def _sanitize_command(cmd): 46 if isinstance(cmd, list): 47 cmd = " ".join(cmd) 48 return re.sub(r"\s+", " ", cmd) 49 50 51class UndefinedImage(Exception): 52 pass 53 54 55class ComposeConfig: 56 57 def __init__(self, config_path, dotenv_path, compose_bin, params=None): 58 config_path = _ensure_path(config_path) 59 if dotenv_path: 60 dotenv_path = _ensure_path(dotenv_path) 61 else: 62 dotenv_path = config_path.parent / '.env' 63 self._read_env(dotenv_path, params) 64 self._read_config(config_path, compose_bin) 65 66 def _read_env(self, dotenv_path, params): 67 """ 68 Read .env and merge it with explicitly passed parameters. 69 """ 70 self.dotenv = dotenv_values(str(dotenv_path)) 71 if params is None: 72 self.params = {} 73 else: 74 self.params = {k: v for k, v in params.items() if k in self.dotenv} 75 76 # forward the process' environment variables 77 self.env = os.environ.copy() 78 # set the defaults from the dotenv files 79 self.env.update(self.dotenv) 80 # override the defaults passed as parameters 81 self.env.update(self.params) 82 83 # translate docker's architecture notation to a more widely used one 84 arch = self.env.get('ARCH', 'amd64') 85 arch_aliases = { 86 'amd64': 'x86_64', 87 'arm64v8': 'aarch64', 88 's390x': 's390x' 89 } 90 arch_short_aliases = { 91 'amd64': 'x64', 92 'arm64v8': 'arm64', 93 's390x': 's390x' 94 } 95 self.env['ARCH_ALIAS'] = arch_aliases.get(arch, arch) 96 self.env['ARCH_SHORT_ALIAS'] = arch_short_aliases.get(arch, arch) 97 98 def _read_config(self, config_path, compose_bin): 99 """ 100 Validate and read the docker-compose.yml 101 """ 102 yaml = YAML() 103 with config_path.open() as fp: 104 config = yaml.load(fp) 105 106 services = config['services'].keys() 107 self.hierarchy = dict(flatten(config.get('x-hierarchy', {}))) 108 self.limit_presets = config.get('x-limit-presets', {}) 109 self.with_gpus = config.get('x-with-gpus', []) 110 nodes = self.hierarchy.keys() 111 errors = [] 112 113 for name in self.with_gpus: 114 if name not in services: 115 errors.append( 116 'Service `{}` defined in `x-with-gpus` bot not in ' 117 '`services`'.format(name) 118 ) 119 for name in nodes - services: 120 errors.append( 121 'Service `{}` is defined in `x-hierarchy` bot not in ' 122 '`services`'.format(name) 123 ) 124 for name in services - nodes: 125 errors.append( 126 'Service `{}` is defined in `services` but not in ' 127 '`x-hierarchy`'.format(name) 128 ) 129 130 # trigger docker-compose's own validation 131 compose = Command('docker-compose') 132 args = ['--file', str(config_path), 'config'] 133 result = compose.run(*args, env=self.env, check=False, 134 stderr=subprocess.PIPE, stdout=subprocess.PIPE) 135 136 if result.returncode != 0: 137 # strip the intro line of docker-compose errors 138 errors += result.stderr.decode().splitlines() 139 140 if errors: 141 msg = '\n'.join([' - {}'.format(msg) for msg in errors]) 142 raise ValueError( 143 'Found errors with docker-compose:\n{}'.format(msg) 144 ) 145 146 rendered_config = StringIO(result.stdout.decode()) 147 self.path = config_path 148 self.config = yaml.load(rendered_config) 149 150 def get(self, service_name): 151 try: 152 service = self.config['services'][service_name] 153 except KeyError: 154 raise UndefinedImage(service_name) 155 service['name'] = service_name 156 service['need_gpu'] = service_name in self.with_gpus 157 service['ancestors'] = self.hierarchy[service_name] 158 return service 159 160 def __getitem__(self, service_name): 161 return self.get(service_name) 162 163 164class Docker(Command): 165 166 def __init__(self, docker_bin=None): 167 self.bin = default_bin(docker_bin, "docker") 168 169 170class DockerCompose(Command): 171 172 def __init__(self, config_path, dotenv_path=None, compose_bin=None, 173 params=None): 174 compose_bin = default_bin(compose_bin, 'docker-compose') 175 self.config = ComposeConfig(config_path, dotenv_path, compose_bin, 176 params) 177 self.bin = compose_bin 178 self.pull_memory = set() 179 180 def clear_pull_memory(self): 181 self.pull_memory = set() 182 183 def _execute_compose(self, *args, **kwargs): 184 # execute as a docker compose command 185 try: 186 result = super().run('--file', str(self.config.path), *args, 187 env=self.config.env, **kwargs) 188 result.check_returncode() 189 except subprocess.CalledProcessError as e: 190 def formatdict(d, template): 191 return '\n'.join( 192 template.format(k, v) for k, v in sorted(d.items()) 193 ) 194 msg = ( 195 "`{cmd}` exited with a non-zero exit code {code}, see the " 196 "process log above.\n\nThe docker-compose command was " 197 "invoked with the following parameters:\n\nDefaults defined " 198 "in .env:\n{dotenv}\n\nArchery was called with:\n{params}" 199 ) 200 raise RuntimeError( 201 msg.format( 202 cmd=' '.join(e.cmd), 203 code=e.returncode, 204 dotenv=formatdict(self.config.dotenv, template=' {}: {}'), 205 params=formatdict( 206 self.config.params, template=' export {}={}' 207 ) 208 ) 209 ) 210 211 def _execute_docker(self, *args, **kwargs): 212 # execute as a plain docker cli command 213 try: 214 result = Docker().run(*args, **kwargs) 215 result.check_returncode() 216 except subprocess.CalledProcessError as e: 217 raise RuntimeError( 218 "{} exited with non-zero exit code {}".format( 219 ' '.join(e.cmd), e.returncode 220 ) 221 ) 222 223 def pull(self, service_name, pull_leaf=True, using_docker=False): 224 def _pull(service): 225 args = ['pull'] 226 if service['image'] in self.pull_memory: 227 return 228 229 if using_docker: 230 try: 231 self._execute_docker(*args, service['image']) 232 except Exception as e: 233 # better --ignore-pull-failures handling 234 print(e) 235 else: 236 args.append('--ignore-pull-failures') 237 self._execute_compose(*args, service['name']) 238 239 self.pull_memory.add(service['image']) 240 241 service = self.config.get(service_name) 242 for ancestor in service['ancestors']: 243 _pull(self.config.get(ancestor)) 244 if pull_leaf: 245 _pull(service) 246 247 def build(self, service_name, use_cache=True, use_leaf_cache=True, 248 using_docker=False, using_buildx=False, pull_parents=True): 249 def _build(service, use_cache): 250 if 'build' not in service: 251 # nothing to do 252 return 253 254 args = [] 255 cache_from = list(service.get('build', {}).get('cache_from', [])) 256 if pull_parents: 257 for image in cache_from: 258 if image not in self.pull_memory: 259 try: 260 self._execute_docker('pull', image) 261 except Exception as e: 262 print(e) 263 finally: 264 self.pull_memory.add(image) 265 266 if not use_cache: 267 args.append('--no-cache') 268 269 # turn on inline build cache, this is a docker buildx feature 270 # used to bundle the image build cache to the pushed image manifest 271 # so the build cache can be reused across hosts, documented at 272 # https://github.com/docker/buildx#--cache-tonametypetypekeyvalue 273 if self.config.env.get('BUILDKIT_INLINE_CACHE') == '1': 274 args.extend(['--build-arg', 'BUILDKIT_INLINE_CACHE=1']) 275 276 if using_buildx: 277 for k, v in service['build'].get('args', {}).items(): 278 args.extend(['--build-arg', '{}={}'.format(k, v)]) 279 280 if use_cache: 281 cache_ref = '{}-cache'.format(service['image']) 282 cache_from = 'type=registry,ref={}'.format(cache_ref) 283 cache_to = ( 284 'type=registry,ref={},mode=max'.format(cache_ref) 285 ) 286 args.extend([ 287 '--cache-from', cache_from, 288 '--cache-to', cache_to, 289 ]) 290 291 args.extend([ 292 '--output', 'type=docker', 293 '-f', service['build']['dockerfile'], 294 '-t', service['image'], 295 service['build'].get('context', '.') 296 ]) 297 self._execute_docker("buildx", "build", *args) 298 elif using_docker: 299 # better for caching 300 for k, v in service['build'].get('args', {}).items(): 301 args.extend(['--build-arg', '{}={}'.format(k, v)]) 302 for img in cache_from: 303 args.append('--cache-from="{}"'.format(img)) 304 args.extend([ 305 '-f', service['build']['dockerfile'], 306 '-t', service['image'], 307 service['build'].get('context', '.') 308 ]) 309 self._execute_docker("build", *args) 310 else: 311 self._execute_compose("build", *args, service['name']) 312 313 service = self.config.get(service_name) 314 # build ancestor services 315 for ancestor in service['ancestors']: 316 _build(self.config.get(ancestor), use_cache=use_cache) 317 # build the leaf/target service 318 _build(service, use_cache=use_cache and use_leaf_cache) 319 320 def run(self, service_name, command=None, *, env=None, volumes=None, 321 user=None, using_docker=False, resource_limit=None): 322 service = self.config.get(service_name) 323 324 args = [] 325 if user is not None: 326 args.extend(['-u', user]) 327 328 if env is not None: 329 for k, v in env.items(): 330 args.extend(['-e', '{}={}'.format(k, v)]) 331 332 if volumes is not None: 333 for volume in volumes: 334 args.extend(['--volume', volume]) 335 336 if using_docker or service['need_gpu'] or resource_limit: 337 # use gpus, requires docker>=19.03 338 if service['need_gpu']: 339 args.extend(['--gpus', 'all']) 340 341 if service.get('shm_size'): 342 args.extend(['--shm-size', service['shm_size']]) 343 344 # append env variables from the compose conf 345 for k, v in service.get('environment', {}).items(): 346 args.extend(['-e', '{}={}'.format(k, v)]) 347 348 # append volumes from the compose conf 349 for v in service.get('volumes', []): 350 if not isinstance(v, str): 351 # if not the compact string volume definition 352 v = "{}:{}".format(v['source'], v['target']) 353 args.extend(['-v', v]) 354 355 # infer whether an interactive shell is desired or not 356 if command in ['cmd.exe', 'bash', 'sh', 'powershell']: 357 args.append('-it') 358 359 if resource_limit: 360 limits = self.config.limit_presets.get(resource_limit) 361 if not limits: 362 raise ValueError( 363 f"Unknown resource limit preset '{resource_limit}'") 364 cpuset = limits.get('cpuset_cpus', []) 365 if cpuset: 366 args.append(f'--cpuset-cpus={",".join(map(str, cpuset))}') 367 memory = limits.get('memory') 368 if memory: 369 args.append(f'--memory={memory}') 370 args.append(f'--memory-swap={memory}') 371 372 # get the actual docker image name instead of the compose service 373 # name which we refer as image in general 374 args.append(service['image']) 375 376 # add command from compose if it wasn't overridden 377 if command is not None: 378 args.append(command) 379 else: 380 # replace whitespaces from the preformatted compose command 381 cmd = _sanitize_command(service.get('command', '')) 382 if cmd: 383 args.append(cmd) 384 385 # execute as a plain docker cli command 386 self._execute_docker('run', '--rm', *args) 387 else: 388 # execute as a docker-compose command 389 args.append(service_name) 390 if command is not None: 391 args.append(command) 392 self._execute_compose('run', '--rm', *args) 393 394 def push(self, service_name, user=None, password=None, using_docker=False): 395 def _push(service): 396 if using_docker: 397 return self._execute_docker('push', service['image']) 398 else: 399 return self._execute_compose('push', service['name']) 400 401 if user is not None: 402 try: 403 # TODO(kszucs): have an option for a prompt 404 self._execute_docker('login', '-u', user, '-p', password) 405 except subprocess.CalledProcessError: 406 # hide credentials 407 msg = ('Failed to push `{}`, check the passed credentials' 408 .format(service_name)) 409 raise RuntimeError(msg) from None 410 411 service = self.config.get(service_name) 412 for ancestor in service['ancestors']: 413 _push(self.config.get(ancestor)) 414 _push(service) 415 416 def images(self): 417 return sorted(self.config.hierarchy.keys()) 418