1# -*- coding: utf-8 -*- # 2# Copyright 2015 Google LLC. All Rights Reserved. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16"""Base class for Spark Job.""" 17 18from __future__ import absolute_import 19from __future__ import division 20from __future__ import unicode_literals 21 22import argparse 23 24from apitools.base.py import encoding 25 26from googlecloudsdk.calliope import arg_parsers 27from googlecloudsdk.command_lib.dataproc.jobs import base as job_base 28 29 30class SparkBase(job_base.JobBase): 31 """Submit a Java or Scala Spark job to a cluster.""" 32 33 @staticmethod 34 def Args(parser): 35 """Parses command-line arguments specific to submitting Spark jobs.""" 36 parser.add_argument( 37 '--jars', 38 type=arg_parsers.ArgList(), 39 metavar='JAR', 40 default=[], 41 help=('Comma separated list of jar files to be provided to the ' 42 'executor and driver classpaths.')) 43 parser.add_argument( 44 '--files', 45 type=arg_parsers.ArgList(), 46 metavar='FILE', 47 default=[], 48 help=('Comma separated list of files to be placed in the working ' 49 'directory of both the app master and executors.')) 50 parser.add_argument( 51 '--archives', 52 type=arg_parsers.ArgList(), 53 metavar='ARCHIVE', 54 default=[], 55 help=( 56 'Comma separated list of archives to be extracted into the working ' 57 'directory of each executor. ' 58 'Must be one of the following file formats: .zip, .tar, .tar.gz, ' 59 'or .tgz.')) 60 parser.add_argument( 61 'job_args', 62 nargs=argparse.REMAINDER, 63 help='Arguments to pass to the driver.') 64 parser.add_argument( 65 '--properties', 66 type=arg_parsers.ArgDict(), 67 metavar='PROPERTY=VALUE', 68 help='List of key value pairs to configure Spark. For a list of ' 69 'available properties, see: ' 70 'https://spark.apache.org/docs/latest/' 71 'configuration.html#available-properties.') 72 parser.add_argument( 73 '--driver-log-levels', 74 type=arg_parsers.ArgDict(), 75 metavar='PACKAGE=LEVEL', 76 help=('List of package to log4j log level pairs to configure driver ' 77 'logging. For example: root=FATAL,com.example=INFO')) 78 79 @staticmethod 80 def GetFilesByType(args): 81 """Returns a dict of files by their type (jars, archives, etc.).""" 82 return { 83 'main_jar': args.main_jar, 84 'jars': args.jars, 85 'archives': args.archives, 86 'files': args.files} 87 88 @staticmethod 89 def ConfigureJob(messages, job, files_by_type, logging_config, args): 90 """Populates the sparkJob member of the given job.""" 91 92 spark_job = messages.SparkJob( 93 args=args.job_args or [], 94 archiveUris=files_by_type['archives'], 95 fileUris=files_by_type['files'], 96 jarFileUris=files_by_type['jars'], 97 mainClass=args.main_class, 98 mainJarFileUri=files_by_type['main_jar'], 99 loggingConfig=logging_config) 100 101 if args.properties: 102 spark_job.properties = encoding.DictToAdditionalPropertyMessage( 103 args.properties, messages.SparkJob.PropertiesValue) 104 105 job.sparkJob = spark_job 106