1# -*- coding: utf-8 -*- # 2# Copyright 2019 Google LLC. All Rights Reserved. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16"""Implementation of gcloud lifesciences pipelines run. 17""" 18 19from __future__ import absolute_import 20from __future__ import division 21from __future__ import unicode_literals 22 23import base64 24from googlecloudsdk.api_lib import lifesciences as lib 25from googlecloudsdk.api_lib.lifesciences import exceptions 26from googlecloudsdk.api_lib.lifesciences import lifesciences_util 27from googlecloudsdk.calliope import arg_parsers 28from googlecloudsdk.calliope import base 29from googlecloudsdk.calliope.concepts import concepts 30from googlecloudsdk.command_lib.util.apis import yaml_data 31from googlecloudsdk.command_lib.util.args import labels_util 32from googlecloudsdk.command_lib.util.concepts import concept_parsers 33from googlecloudsdk.core import log 34from googlecloudsdk.core import properties 35from googlecloudsdk.core.util import files 36import six 37 38CLOUD_SDK_IMAGE = 'google/cloud-sdk:slim' 39SHARED_DISK = 'gcloud-shared' 40 41 42class _SharedPathGenerator(object): 43 44 def __init__(self, root): 45 self.root = root 46 self.index = -1 47 48 def Generate(self): 49 self.index += 1 50 return '/%s/%s%d' % (SHARED_DISK, self.root, self.index) 51 52 53def _ValidateAndMergeArgInputs(args): 54 """Turn args.inputs and args.inputs_from_file dicts into a single dict. 55 56 Args: 57 args: The parsed command-line arguments 58 59 Returns: 60 A dict that is the merge of args.inputs and args.inputs_from_file 61 Raises: 62 files.Error 63 """ 64 65 is_local_file = {} 66 67 # If no inputs from file, then no validation or merge needed 68 if not args.inputs_from_file: 69 return args.inputs, is_local_file 70 71 # Initialize the merged dictionary 72 arg_inputs = {} 73 74 if args.inputs: 75 # Validate args.inputs and args.inputs-from-file do not overlap 76 overlap = set(args.inputs.keys()).intersection( 77 set(args.inputs_from_file.keys())) 78 if overlap: 79 raise exceptions.LifeSciencesError( 80 '--{0} and --{1} may not specify overlapping values: {2}' 81 .format('inputs', 'inputs-from-file', ', '.join(overlap))) 82 83 # Add the args.inputs 84 arg_inputs.update(args.inputs) 85 86 # Read up the inputs-from-file and add the values from the file 87 for key, value in six.iteritems(args.inputs_from_file): 88 arg_inputs[key] = files.ReadFileContents(value) 89 is_local_file[key] = True 90 91 return arg_inputs, is_local_file 92 93 94class Run(base.SilentCommand): 95 r"""Defines and runs a pipeline. 96 97 A pipeline is a transformation of a set of inputs to a set of outputs. 98 Supports Docker-based commands. 99 100 ## EXAMPLES 101 To run a pipeline described in the `pipeline.json` file, run: 102 103 $ {command} --pipeline-file=pipeline.json 104 """ 105 106 @staticmethod 107 def Args(parser): 108 """Args is called by calliope to gather arguments for this command. 109 110 Args: 111 parser: An argparse parser that you can use to add arguments that go 112 on the command line after this command. Positional arguments are 113 allowed. 114 """ 115 location_spec = concepts.ResourceSpec.FromYaml( 116 yaml_data.ResourceYAMLData.FromPath('lifesciences.location') 117 .GetData()) 118 concept_parsers.ConceptParser.ForResource( 119 '--location', 120 location_spec, 121 'The Google Cloud location to run the pipeline.', 122 required=True).AddToParser(parser) 123 124 pipeline = parser.add_mutually_exclusive_group(required=True) 125 pipeline.add_argument( 126 '--pipeline-file', 127 help='''A YAML or JSON file containing a Pipeline object. See 128[](https://cloud.google.com/life-sciences/docs/reference/rest/v2beta/projects.locations.pipelines/run#pipeline) 129''') 130 pipeline.add_argument( 131 '--command-line', 132 category=base.COMMONLY_USED_FLAGS, 133 help='''Command line to run with /bin/sh in the specified 134 Docker image. Cannot be used with --pipeline-file.''') 135 136 parser.add_argument( 137 '--docker-image', 138 category=base.COMMONLY_USED_FLAGS, 139 default=CLOUD_SDK_IMAGE, 140 help='''A Docker image to run. Requires --command-line to 141 be specified and cannot be used with --pipeline-file.''') 142 143 parser.add_argument( 144 '--inputs', 145 category=base.COMMONLY_USED_FLAGS, 146 metavar='NAME=VALUE', 147 type=arg_parsers.ArgDict(), 148 action=arg_parsers.UpdateAction, 149 help='''Map of input PipelineParameter names to values. 150 Used to pass literal parameters to the pipeline, and to specify 151 input files in Google Cloud Storage that will have a localCopy 152 made. Specified as a comma-separated list: --inputs 153 file=gs://my-bucket/in.txt,name=hello''') 154 155 parser.add_argument( 156 '--inputs-from-file', 157 category=base.COMMONLY_USED_FLAGS, 158 metavar='NAME=FILE', 159 type=arg_parsers.ArgDict(), 160 action=arg_parsers.UpdateAction, 161 help='''Map of input PipelineParameter names to values. 162 Used to pass literal parameters to the pipeline where values come 163 from local files; this can be used to send large pipeline input 164 parameters, such as code, data, or configuration values. 165 Specified as a comma-separated list: 166 --inputs-from-file script=myshellscript.sh,pyfile=mypython.py''') 167 168 parser.add_argument( 169 '--outputs', 170 category=base.COMMONLY_USED_FLAGS, 171 metavar='NAME=VALUE', 172 type=arg_parsers.ArgDict(), 173 action=arg_parsers.UpdateAction, 174 help='''Map of output PipelineParameter names to values. 175 Used to specify output files in Google Cloud Storage that will be 176 made from a localCopy. Specified as a comma-separated list: 177 --outputs ref=gs://my-bucket/foo,ref2=gs://my-bucket/bar''') 178 179 parser.add_argument( 180 '--logging', 181 category=base.COMMONLY_USED_FLAGS, 182 help='''The location in Google Cloud Storage to which the pipeline logs 183 will be copied. Can be specified as a fully qualified directory 184 path, in which case logs will be output with a unique identifier 185 as the filename in that directory, or as a fully specified path, 186 which must end in `.log`, in which case that path will be 187 used. Stdout and stderr logs from the run are also generated and 188 output as `-stdout.log` and `-stderr.log`.''') 189 190 parser.add_argument( 191 '--env-vars', 192 category=base.COMMONLY_USED_FLAGS, 193 metavar='NAME=VALUE', 194 type=arg_parsers.ArgDict(), 195 help='''List of key-value pairs to set as environment variables.''') 196 197 labels_util.AddCreateLabelsFlags(parser) 198 199 parser.add_argument( 200 '--disk-size', 201 category=base.COMMONLY_USED_FLAGS, 202 default=None, 203 help='''The disk size(s) in GB, specified as a comma-separated list of 204 pairs of disk name and size. For example: 205 --disk-size "name:size,name2:size2". 206 Overrides any values specified in the pipeline-file.''') 207 208 parser.add_argument( 209 '--preemptible', 210 category=base.COMMONLY_USED_FLAGS, 211 action='store_true', 212 help='''Whether to use a preemptible VM for this pipeline. The 213 "resource" section of the pipeline-file must also set preemptible 214 to "true" for this flag to take effect.''') 215 216 parser.add_argument( 217 '--run-id', 218 hidden=True, 219 help='THIS ARGUMENT NEEDS HELP TEXT.') 220 221 parser.add_argument( 222 '--service-account-email', 223 default='default', 224 help='''The service account used to run the pipeline. If unspecified, 225 defaults to the Compute Engine service account for your project.''') 226 227 parser.add_argument( 228 '--service-account-scopes', 229 metavar='SCOPE', 230 type=arg_parsers.ArgList(), 231 default=[], 232 help='''List of additional scopes to be made available for this service 233 account. The following scopes are always requested: 234 235 https://www.googleapis.com/auth/cloud-platform''') 236 237 parser.add_argument( 238 '--machine-type', 239 default='n1-standard-1', 240 help='''The type of VirtualMachine to use. Defaults to n1-standard-1.''') 241 242 parser.add_argument( 243 '--zones', 244 metavar='ZONE', 245 type=arg_parsers.ArgList(), 246 help='''List of Compute Engine zones the pipeline can run in. 247 248If no zones are specified with the zones flag, then zones in the 249pipeline definition file will be used. 250 251If no zones are specified in the pipeline definition, then the 252default zone in your local client configuration is used (and must be specified). 253 254For more information on default zones, see 255https://cloud.google.com/compute/docs/gcloud-compute/#set_default_zone_and_region_in_your_local_client''') 256 257 parser.add_argument( 258 '--regions', 259 metavar='REGION', 260 type=arg_parsers.ArgList(), 261 help='''List of Compute Engine regions the pipeline can 262 run in. 263 264If no regions are specified with the regions flag, then regions in the 265pipeline definition file will be used. 266 267If no regions are specified in the pipeline definition, then the 268default region in your local client configuration is used. 269 270At least one region or region must be specified. 271 272For more information on default regions, see 273https://cloud.google.com/compute/docs/gcloud-compute/#set_default_zone_and_region_in_your_local_client''') 274 275 parser.add_argument( 276 '--network', 277 help='''The network name to attach the VM's network 278 interface to. 279 280The value will be prefixed with global/networks/ unless it contains a /, in 281which case it is assumed to be a fully specified network resource URL. 282 283If unspecified, the global default network is used.''') 284 285 parser.add_argument( 286 '--subnetwork', 287 help='''The subnetwork to use on the provided network. 288 289If the specified network is configured for custom subnet creation, the name of 290the subnetwork to attach the instance to must be specified here. 291 292The value is prefixed with regions/*/subnetworks/ unless it contains a /, in 293which case it is assumed to be a fully specified subnetwork resource URL. 294 295If the * character appears in the value, it is replaced with the region that 296the virtual machine has been allocated in.''') 297 298 parser.add_argument( 299 '--boot-disk-size', 300 type=int, 301 help='''The size of the boot disk in GB. 302 303The boot disk size must be large enough to accommodate all Docker images from 304each action in the pipeline at the same time. If not specified, a small but 305reasonable default value is used.''') 306 307 def Run(self, args): 308 """This is what gets called when the user runs this command. 309 310 Args: 311 args: argparse.Namespace, All the arguments that were provided to this 312 command invocation. 313 314 Raises: 315 files.Error: A file argument could not be read. 316 LifeSciencesError: User input was invalid. 317 HttpException: An http error response was received while executing api 318 request. 319 Returns: 320 Operation representing the running pipeline. 321 """ 322 pipeline = None 323 apitools_client = lifesciences_util.GetLifeSciencesClient('v2beta') 324 lifesciences_messages = lifesciences_util.GetLifeSciencesMessages('v2beta') 325 if args.pipeline_file: 326 pipeline = lifesciences_util.GetFileAsMessage( 327 args.pipeline_file, 328 lifesciences_messages.Pipeline, 329 self.context[lib.STORAGE_V1_CLIENT_KEY]) 330 elif args.command_line: 331 pipeline = lifesciences_messages.Pipeline( 332 actions=[lifesciences_messages.Action( 333 imageUri=args.docker_image, 334 commands=['-c', args.command_line], 335 entrypoint='bash')]) 336 337 arg_inputs, is_local_file = _ValidateAndMergeArgInputs(args) 338 339 request = None 340 # Create messages up front to avoid checking for None everywhere. 341 if not pipeline.resources: 342 pipeline.resources = lifesciences_messages.Resources() 343 resources = pipeline.resources 344 345 if not resources.virtualMachine: 346 resources.virtualMachine = lifesciences_messages.VirtualMachine( 347 machineType=args.machine_type) 348 virtual_machine = resources.virtualMachine 349 350 if not virtual_machine.serviceAccount: 351 virtual_machine.serviceAccount = lifesciences_messages.ServiceAccount() 352 353 if args.preemptible: 354 virtual_machine.preemptible = args.preemptible 355 356 if args.zones: 357 resources.zones = args.zones 358 elif not resources.zones and properties.VALUES.compute.zone.Get(): 359 resources.zones = [properties.VALUES.compute.zone.Get()] 360 361 if args.regions: 362 resources.regions = args.regions 363 elif not resources.regions and properties.VALUES.compute.region.Get(): 364 resources.regions = [properties.VALUES.compute.region.Get()] 365 366 if args.service_account_email != 'default': 367 virtual_machine.serviceAccount.email = args.service_account_email 368 369 if args.service_account_scopes: 370 virtual_machine.serviceAccount.scopes = args.service_account_scopes 371 372 # Always add the cloud-platform scope for user convenience. 373 virtual_machine.serviceAccount.scopes.append( 374 'https://www.googleapis.com/auth/cloud-platform') 375 376 # Attach custom network/subnetwork (if set). 377 if args.network or args.subnetwork: 378 if not virtual_machine.network: 379 virtual_machine.network = lifesciences_messages.Network() 380 if args.network: 381 virtual_machine.network.network = args.network 382 if args.subnetwork: 383 virtual_machine.network.subnetwork = args.subnetwork 384 385 if args.boot_disk_size is not None: 386 if args.boot_disk_size <= 0: 387 raise exceptions.LifeSciencesError( 388 'Boot disk size must be greater than zero.') 389 virtual_machine.bootDiskSizeGb = args.boot_disk_size 390 391 # Generate paths for inputs and outputs in a shared location and put them 392 # into the environment for actions based on their name. 393 env = {} 394 if arg_inputs: 395 input_generator = _SharedPathGenerator('input') 396 for name, value in arg_inputs.items(): 397 if lifesciences_util.IsGcsPath(value): 398 env[name] = input_generator.Generate() 399 pipeline.actions.insert(0, lifesciences_messages.Action( 400 imageUri=CLOUD_SDK_IMAGE, 401 commands=['/bin/sh', '-c', 'gsutil -m -q cp %s ${%s}' % 402 (value, name)])) 403 elif name in is_local_file: 404 env[name] = input_generator.Generate() 405 pipeline.actions.insert( 406 0, 407 lifesciences_messages.Action( 408 imageUri=CLOUD_SDK_IMAGE, 409 commands=[ 410 '/bin/sh', '-c', 411 'echo "%s" | base64 -d > ${%s}' % 412 (base64.b64encode(value.encode()).decode(), name) 413 ])) 414 else: 415 env[name] = value 416 417 if args.outputs: 418 output_generator = _SharedPathGenerator('output') 419 for name, value in args.outputs.items(): 420 env[name] = output_generator.Generate() 421 pipeline.actions.append(lifesciences_messages.Action( 422 imageUri=CLOUD_SDK_IMAGE, 423 commands=['/bin/sh', '-c', 'gsutil -m -q cp ${%s} %s' % (name, 424 value)])) 425 if args.env_vars: 426 for name, value in args.env_vars.items(): 427 env[name] = value 428 429 # Merge any existing pipeline arguments into the generated environment and 430 # update the pipeline. 431 if pipeline.environment: 432 for val in pipeline.environment.additionalProperties: 433 if val.key not in env: 434 env[val.key] = val.value 435 436 pipeline.environment = lifesciences_messages.Pipeline.EnvironmentValue( 437 additionalProperties=lifesciences_util.ArgDictToAdditionalPropertiesList( 438 env, 439 lifesciences_messages.Pipeline.EnvironmentValue.AdditionalProperty)) 440 441 if arg_inputs or args.outputs: 442 virtual_machine.disks.append(lifesciences_messages.Disk( 443 name=SHARED_DISK)) 444 445 for action in pipeline.actions: 446 action.mounts.append(lifesciences_messages.Mount( 447 disk=SHARED_DISK, 448 path='/' + SHARED_DISK)) 449 450 if args.logging: 451 pipeline.actions.append(lifesciences_messages.Action( 452 imageUri=CLOUD_SDK_IMAGE, 453 commands=['/bin/sh', '-c', 454 'gsutil -m -q cp /google/logs/output ' + args.logging], 455 alwaysRun=True)) 456 457 # Update disk sizes if specified, potentially including the shared disk. 458 if args.disk_size: 459 disk_sizes = {} 460 for disk_encoding in args.disk_size.split(','): 461 parts = disk_encoding.split(':', 1) 462 try: 463 disk_sizes[parts[0]] = int(parts[1]) 464 except: 465 raise exceptions.LifeSciencesError('Invalid --disk-size.') 466 467 for disk in virtual_machine.disks: 468 if disk.name in disk_sizes: 469 disk.sizeGb = disk_sizes[disk.name] 470 471 request = lifesciences_messages.RunPipelineRequest( 472 pipeline=pipeline, 473 labels=labels_util.ParseCreateArgs( 474 args, lifesciences_messages.RunPipelineRequest.LabelsValue)) 475 projectId = lifesciences_util.GetProjectId() 476 location_ref = args.CONCEPTS.location.Parse() 477 request_wrapper = lifesciences_messages.LifesciencesProjectsLocationsPipelinesRunRequest( 478 parent=location_ref.RelativeName(), 479 runPipelineRequest=request) 480 481 result = apitools_client.projects_locations_pipelines.Run(request_wrapper) 482 log.status.Print('Running [{0}].'.format(result.name)) 483 return result 484