1# -*- coding: utf-8 -*- #
2# Copyright 2019 Google LLC. All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#    http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16"""Implementation of gcloud lifesciences pipelines run.
17"""
18
19from __future__ import absolute_import
20from __future__ import division
21from __future__ import unicode_literals
22
23import base64
24from googlecloudsdk.api_lib import lifesciences as lib
25from googlecloudsdk.api_lib.lifesciences import exceptions
26from googlecloudsdk.api_lib.lifesciences import lifesciences_util
27from googlecloudsdk.calliope import arg_parsers
28from googlecloudsdk.calliope import base
29from googlecloudsdk.calliope.concepts import concepts
30from googlecloudsdk.command_lib.util.apis import yaml_data
31from googlecloudsdk.command_lib.util.args import labels_util
32from googlecloudsdk.command_lib.util.concepts import concept_parsers
33from googlecloudsdk.core import log
34from googlecloudsdk.core import properties
35from googlecloudsdk.core.util import files
36import six
37
38CLOUD_SDK_IMAGE = 'google/cloud-sdk:slim'
39SHARED_DISK = 'gcloud-shared'
40
41
42class _SharedPathGenerator(object):
43
44  def __init__(self, root):
45    self.root = root
46    self.index = -1
47
48  def Generate(self):
49    self.index += 1
50    return '/%s/%s%d' % (SHARED_DISK, self.root, self.index)
51
52
53def _ValidateAndMergeArgInputs(args):
54  """Turn args.inputs and args.inputs_from_file dicts into a single dict.
55
56  Args:
57    args: The parsed command-line arguments
58
59  Returns:
60    A dict that is the merge of args.inputs and args.inputs_from_file
61  Raises:
62    files.Error
63  """
64
65  is_local_file = {}
66
67  # If no inputs from file, then no validation or merge needed
68  if not args.inputs_from_file:
69    return args.inputs, is_local_file
70
71  # Initialize the merged dictionary
72  arg_inputs = {}
73
74  if args.inputs:
75    # Validate args.inputs and args.inputs-from-file do not overlap
76    overlap = set(args.inputs.keys()).intersection(
77        set(args.inputs_from_file.keys()))
78    if overlap:
79      raise exceptions.LifeSciencesError(
80          '--{0} and --{1} may not specify overlapping values: {2}'
81          .format('inputs', 'inputs-from-file', ', '.join(overlap)))
82
83    # Add the args.inputs
84    arg_inputs.update(args.inputs)
85
86  # Read up the inputs-from-file and add the values from the file
87  for key, value in six.iteritems(args.inputs_from_file):
88    arg_inputs[key] = files.ReadFileContents(value)
89    is_local_file[key] = True
90
91  return arg_inputs, is_local_file
92
93
94class Run(base.SilentCommand):
95  r"""Defines and runs a pipeline.
96
97  A pipeline is a transformation of a set of inputs to a set of outputs.
98  Supports Docker-based commands.
99
100  ## EXAMPLES
101  To run a pipeline described in the `pipeline.json` file, run:
102
103    $ {command} --pipeline-file=pipeline.json
104  """
105
106  @staticmethod
107  def Args(parser):
108    """Args is called by calliope to gather arguments for this command.
109
110    Args:
111      parser: An argparse parser that you can use to add arguments that go
112          on the command line after this command. Positional arguments are
113          allowed.
114    """
115    location_spec = concepts.ResourceSpec.FromYaml(
116        yaml_data.ResourceYAMLData.FromPath('lifesciences.location')
117        .GetData())
118    concept_parsers.ConceptParser.ForResource(
119        '--location',
120        location_spec,
121        'The Google Cloud location to run the pipeline.',
122        required=True).AddToParser(parser)
123
124    pipeline = parser.add_mutually_exclusive_group(required=True)
125    pipeline.add_argument(
126        '--pipeline-file',
127        help='''A YAML or JSON file containing a Pipeline object. See
128[](https://cloud.google.com/life-sciences/docs/reference/rest/v2beta/projects.locations.pipelines/run#pipeline)
129''')
130    pipeline.add_argument(
131        '--command-line',
132        category=base.COMMONLY_USED_FLAGS,
133        help='''Command line to run with /bin/sh in the specified
134            Docker image. Cannot be used with --pipeline-file.''')
135
136    parser.add_argument(
137        '--docker-image',
138        category=base.COMMONLY_USED_FLAGS,
139        default=CLOUD_SDK_IMAGE,
140        help='''A Docker image to run. Requires --command-line to
141            be specified and cannot be used with --pipeline-file.''')
142
143    parser.add_argument(
144        '--inputs',
145        category=base.COMMONLY_USED_FLAGS,
146        metavar='NAME=VALUE',
147        type=arg_parsers.ArgDict(),
148        action=arg_parsers.UpdateAction,
149        help='''Map of input PipelineParameter names to values.
150            Used to pass literal parameters to the pipeline, and to specify
151            input files in Google Cloud Storage that will have a localCopy
152            made. Specified as a comma-separated list: --inputs
153            file=gs://my-bucket/in.txt,name=hello''')
154
155    parser.add_argument(
156        '--inputs-from-file',
157        category=base.COMMONLY_USED_FLAGS,
158        metavar='NAME=FILE',
159        type=arg_parsers.ArgDict(),
160        action=arg_parsers.UpdateAction,
161        help='''Map of input PipelineParameter names to values.
162            Used to pass literal parameters to the pipeline where values come
163            from local files; this can be used to send large pipeline input
164            parameters, such as code, data, or configuration values.
165            Specified as a comma-separated list:
166            --inputs-from-file script=myshellscript.sh,pyfile=mypython.py''')
167
168    parser.add_argument(
169        '--outputs',
170        category=base.COMMONLY_USED_FLAGS,
171        metavar='NAME=VALUE',
172        type=arg_parsers.ArgDict(),
173        action=arg_parsers.UpdateAction,
174        help='''Map of output PipelineParameter names to values.
175            Used to specify output files in Google Cloud Storage that will be
176            made from a localCopy. Specified as a comma-separated list:
177            --outputs ref=gs://my-bucket/foo,ref2=gs://my-bucket/bar''')
178
179    parser.add_argument(
180        '--logging',
181        category=base.COMMONLY_USED_FLAGS,
182        help='''The location in Google Cloud Storage to which the pipeline logs
183            will be copied. Can be specified as a fully qualified directory
184            path, in which case logs will be output with a unique identifier
185            as the filename in that directory, or as a fully specified path,
186            which must end in `.log`, in which case that path will be
187            used. Stdout and stderr logs from the run are also generated and
188            output as `-stdout.log` and `-stderr.log`.''')
189
190    parser.add_argument(
191        '--env-vars',
192        category=base.COMMONLY_USED_FLAGS,
193        metavar='NAME=VALUE',
194        type=arg_parsers.ArgDict(),
195        help='''List of key-value pairs to set as environment variables.''')
196
197    labels_util.AddCreateLabelsFlags(parser)
198
199    parser.add_argument(
200        '--disk-size',
201        category=base.COMMONLY_USED_FLAGS,
202        default=None,
203        help='''The disk size(s) in GB, specified as a comma-separated list of
204            pairs of disk name and size. For example:
205            --disk-size "name:size,name2:size2".
206            Overrides any values specified in the pipeline-file.''')
207
208    parser.add_argument(
209        '--preemptible',
210        category=base.COMMONLY_USED_FLAGS,
211        action='store_true',
212        help='''Whether to use a preemptible VM for this pipeline. The
213            "resource" section of the pipeline-file must also set preemptible
214            to "true" for this flag to take effect.''')
215
216    parser.add_argument(
217        '--run-id',
218        hidden=True,
219        help='THIS ARGUMENT NEEDS HELP TEXT.')
220
221    parser.add_argument(
222        '--service-account-email',
223        default='default',
224        help='''The service account used to run the pipeline. If unspecified,
225            defaults to the Compute Engine service account for your project.''')
226
227    parser.add_argument(
228        '--service-account-scopes',
229        metavar='SCOPE',
230        type=arg_parsers.ArgList(),
231        default=[],
232        help='''List of additional scopes to be made available for this service
233             account. The following scopes are always requested:
234
235             https://www.googleapis.com/auth/cloud-platform''')
236
237    parser.add_argument(
238        '--machine-type',
239        default='n1-standard-1',
240        help='''The type of VirtualMachine to use. Defaults to n1-standard-1.''')
241
242    parser.add_argument(
243        '--zones',
244        metavar='ZONE',
245        type=arg_parsers.ArgList(),
246        help='''List of Compute Engine zones the pipeline can run in.
247
248If no zones are specified with the zones flag, then zones in the
249pipeline definition file will be used.
250
251If no zones are specified in the pipeline definition, then the
252default zone in your local client configuration is used (and must be specified).
253
254For more information on default zones, see
255https://cloud.google.com/compute/docs/gcloud-compute/#set_default_zone_and_region_in_your_local_client''')
256
257    parser.add_argument(
258        '--regions',
259        metavar='REGION',
260        type=arg_parsers.ArgList(),
261        help='''List of Compute Engine regions the pipeline can
262            run in.
263
264If no regions are specified with the regions flag, then regions in the
265pipeline definition file will be used.
266
267If no regions are specified in the pipeline definition, then the
268default region in your local client configuration is used.
269
270At least one region or region must be specified.
271
272For more information on default regions, see
273https://cloud.google.com/compute/docs/gcloud-compute/#set_default_zone_and_region_in_your_local_client''')
274
275    parser.add_argument(
276        '--network',
277        help='''The network name to attach the VM's network
278            interface to.
279
280The value will be prefixed with global/networks/ unless it contains a /, in
281which case it is assumed to be a fully specified network resource URL.
282
283If unspecified, the global default network is used.''')
284
285    parser.add_argument(
286        '--subnetwork',
287        help='''The subnetwork to use on the provided network.
288
289If the specified network is configured for custom subnet creation, the name of
290the subnetwork to attach the instance to must be specified here.
291
292The value is prefixed with regions/*/subnetworks/ unless it contains a /, in
293which case it is assumed to be a fully specified subnetwork resource URL.
294
295If the * character appears in the value, it is replaced with the region that
296the virtual machine has been allocated in.''')
297
298    parser.add_argument(
299        '--boot-disk-size',
300        type=int,
301        help='''The size of the boot disk in GB.
302
303The boot disk size must be large enough to accommodate all Docker images from
304each action in the pipeline at the same time. If not specified, a small but
305reasonable default value is used.''')
306
307  def Run(self, args):
308    """This is what gets called when the user runs this command.
309
310    Args:
311      args: argparse.Namespace, All the arguments that were provided to this
312        command invocation.
313
314    Raises:
315      files.Error: A file argument could not be read.
316      LifeSciencesError: User input was invalid.
317      HttpException: An http error response was received while executing api
318          request.
319    Returns:
320      Operation representing the running pipeline.
321    """
322    pipeline = None
323    apitools_client = lifesciences_util.GetLifeSciencesClient('v2beta')
324    lifesciences_messages = lifesciences_util.GetLifeSciencesMessages('v2beta')
325    if args.pipeline_file:
326      pipeline = lifesciences_util.GetFileAsMessage(
327          args.pipeline_file,
328          lifesciences_messages.Pipeline,
329          self.context[lib.STORAGE_V1_CLIENT_KEY])
330    elif args.command_line:
331      pipeline = lifesciences_messages.Pipeline(
332          actions=[lifesciences_messages.Action(
333              imageUri=args.docker_image,
334              commands=['-c', args.command_line],
335              entrypoint='bash')])
336
337    arg_inputs, is_local_file = _ValidateAndMergeArgInputs(args)
338
339    request = None
340    # Create messages up front to avoid checking for None everywhere.
341    if not pipeline.resources:
342      pipeline.resources = lifesciences_messages.Resources()
343    resources = pipeline.resources
344
345    if not resources.virtualMachine:
346      resources.virtualMachine = lifesciences_messages.VirtualMachine(
347          machineType=args.machine_type)
348    virtual_machine = resources.virtualMachine
349
350    if not virtual_machine.serviceAccount:
351      virtual_machine.serviceAccount = lifesciences_messages.ServiceAccount()
352
353    if args.preemptible:
354      virtual_machine.preemptible = args.preemptible
355
356    if args.zones:
357      resources.zones = args.zones
358    elif not resources.zones and properties.VALUES.compute.zone.Get():
359      resources.zones = [properties.VALUES.compute.zone.Get()]
360
361    if args.regions:
362      resources.regions = args.regions
363    elif not resources.regions and properties.VALUES.compute.region.Get():
364      resources.regions = [properties.VALUES.compute.region.Get()]
365
366    if args.service_account_email != 'default':
367      virtual_machine.serviceAccount.email = args.service_account_email
368
369    if args.service_account_scopes:
370      virtual_machine.serviceAccount.scopes = args.service_account_scopes
371
372    # Always add the cloud-platform scope for user convenience.
373    virtual_machine.serviceAccount.scopes.append(
374        'https://www.googleapis.com/auth/cloud-platform')
375
376    # Attach custom network/subnetwork (if set).
377    if args.network or args.subnetwork:
378      if not virtual_machine.network:
379        virtual_machine.network = lifesciences_messages.Network()
380      if args.network:
381        virtual_machine.network.network = args.network
382      if args.subnetwork:
383        virtual_machine.network.subnetwork = args.subnetwork
384
385    if args.boot_disk_size is not None:
386      if args.boot_disk_size <= 0:
387        raise exceptions.LifeSciencesError(
388            'Boot disk size must be greater than zero.')
389      virtual_machine.bootDiskSizeGb = args.boot_disk_size
390
391    # Generate paths for inputs and outputs in a shared location and put them
392    # into the environment for actions based on their name.
393    env = {}
394    if arg_inputs:
395      input_generator = _SharedPathGenerator('input')
396      for name, value in arg_inputs.items():
397        if lifesciences_util.IsGcsPath(value):
398          env[name] = input_generator.Generate()
399          pipeline.actions.insert(0, lifesciences_messages.Action(
400              imageUri=CLOUD_SDK_IMAGE,
401              commands=['/bin/sh', '-c', 'gsutil -m -q cp %s ${%s}' %
402                        (value, name)]))
403        elif name in is_local_file:
404          env[name] = input_generator.Generate()
405          pipeline.actions.insert(
406              0,
407              lifesciences_messages.Action(
408                  imageUri=CLOUD_SDK_IMAGE,
409                  commands=[
410                      '/bin/sh', '-c',
411                      'echo "%s" | base64 -d > ${%s}' %
412                      (base64.b64encode(value.encode()).decode(), name)
413                  ]))
414        else:
415          env[name] = value
416
417    if args.outputs:
418      output_generator = _SharedPathGenerator('output')
419      for name, value in args.outputs.items():
420        env[name] = output_generator.Generate()
421        pipeline.actions.append(lifesciences_messages.Action(
422            imageUri=CLOUD_SDK_IMAGE,
423            commands=['/bin/sh', '-c', 'gsutil -m -q cp ${%s} %s' % (name,
424                                                                       value)]))
425    if args.env_vars:
426      for name, value in args.env_vars.items():
427        env[name] = value
428
429    # Merge any existing pipeline arguments into the generated environment and
430    # update the pipeline.
431    if pipeline.environment:
432      for val in pipeline.environment.additionalProperties:
433        if val.key not in env:
434          env[val.key] = val.value
435
436    pipeline.environment = lifesciences_messages.Pipeline.EnvironmentValue(
437        additionalProperties=lifesciences_util.ArgDictToAdditionalPropertiesList(
438            env,
439            lifesciences_messages.Pipeline.EnvironmentValue.AdditionalProperty))
440
441    if arg_inputs or args.outputs:
442      virtual_machine.disks.append(lifesciences_messages.Disk(
443          name=SHARED_DISK))
444
445      for action in pipeline.actions:
446        action.mounts.append(lifesciences_messages.Mount(
447            disk=SHARED_DISK,
448            path='/' + SHARED_DISK))
449
450    if args.logging:
451      pipeline.actions.append(lifesciences_messages.Action(
452          imageUri=CLOUD_SDK_IMAGE,
453          commands=['/bin/sh', '-c',
454                    'gsutil -m -q cp /google/logs/output ' + args.logging],
455          alwaysRun=True))
456
457    # Update disk sizes if specified, potentially including the shared disk.
458    if args.disk_size:
459      disk_sizes = {}
460      for disk_encoding in args.disk_size.split(','):
461        parts = disk_encoding.split(':', 1)
462        try:
463          disk_sizes[parts[0]] = int(parts[1])
464        except:
465          raise exceptions.LifeSciencesError('Invalid --disk-size.')
466
467      for disk in virtual_machine.disks:
468        if disk.name in disk_sizes:
469          disk.sizeGb = disk_sizes[disk.name]
470
471    request = lifesciences_messages.RunPipelineRequest(
472        pipeline=pipeline,
473        labels=labels_util.ParseCreateArgs(
474            args, lifesciences_messages.RunPipelineRequest.LabelsValue))
475    projectId = lifesciences_util.GetProjectId()
476    location_ref = args.CONCEPTS.location.Parse()
477    request_wrapper = lifesciences_messages.LifesciencesProjectsLocationsPipelinesRunRequest(
478        parent=location_ref.RelativeName(),
479        runPipelineRequest=request)
480
481    result = apitools_client.projects_locations_pipelines.Run(request_wrapper)
482    log.status.Print('Running [{0}].'.format(result.name))
483    return result
484