1# -*- coding: utf-8 -*- #
2# Copyright 2015 Google LLC. All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#    http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16"""Base class for Spark Job."""
17
18from __future__ import absolute_import
19from __future__ import division
20from __future__ import unicode_literals
21
22import argparse
23
24from apitools.base.py import encoding
25
26from googlecloudsdk.calliope import arg_parsers
27from googlecloudsdk.command_lib.dataproc.jobs import base as job_base
28
29
30class SparkBase(job_base.JobBase):
31  """Submit a Java or Scala Spark job to a cluster."""
32
33  @staticmethod
34  def Args(parser):
35    """Parses command-line arguments specific to submitting Spark jobs."""
36    parser.add_argument(
37        '--jars',
38        type=arg_parsers.ArgList(),
39        metavar='JAR',
40        default=[],
41        help=('Comma separated list of jar files to be provided to the '
42              'executor and driver classpaths.'))
43    parser.add_argument(
44        '--files',
45        type=arg_parsers.ArgList(),
46        metavar='FILE',
47        default=[],
48        help=('Comma separated list of files to be placed in the working '
49              'directory of both the app master and executors.'))
50    parser.add_argument(
51        '--archives',
52        type=arg_parsers.ArgList(),
53        metavar='ARCHIVE',
54        default=[],
55        help=(
56            'Comma separated list of archives to be extracted into the working '
57            'directory of each executor. '
58            'Must be one of the following file formats: .zip, .tar, .tar.gz, '
59            'or .tgz.'))
60    parser.add_argument(
61        'job_args',
62        nargs=argparse.REMAINDER,
63        help='Arguments to pass to the driver.')
64    parser.add_argument(
65        '--properties',
66        type=arg_parsers.ArgDict(),
67        metavar='PROPERTY=VALUE',
68        help='List of key value pairs to configure Spark. For a list of '
69             'available properties, see: '
70             'https://spark.apache.org/docs/latest/'
71             'configuration.html#available-properties.')
72    parser.add_argument(
73        '--driver-log-levels',
74        type=arg_parsers.ArgDict(),
75        metavar='PACKAGE=LEVEL',
76        help=('List of package to log4j log level pairs to configure driver '
77              'logging. For example: root=FATAL,com.example=INFO'))
78
79  @staticmethod
80  def GetFilesByType(args):
81    """Returns a dict of files by their type (jars, archives, etc.)."""
82    return {
83        'main_jar': args.main_jar,
84        'jars': args.jars,
85        'archives': args.archives,
86        'files': args.files}
87
88  @staticmethod
89  def ConfigureJob(messages, job, files_by_type, logging_config, args):
90    """Populates the sparkJob member of the given job."""
91
92    spark_job = messages.SparkJob(
93        args=args.job_args or [],
94        archiveUris=files_by_type['archives'],
95        fileUris=files_by_type['files'],
96        jarFileUris=files_by_type['jars'],
97        mainClass=args.main_class,
98        mainJarFileUri=files_by_type['main_jar'],
99        loggingConfig=logging_config)
100
101    if args.properties:
102      spark_job.properties = encoding.DictToAdditionalPropertyMessage(
103          args.properties, messages.SparkJob.PropertiesValue)
104
105    job.sparkJob = spark_job
106