1# -------------------------------------------------------------------------------------------- 2# Copyright (c) Microsoft Corporation. All rights reserved. 3# Licensed under the MIT License. See License.txt in the project root for license information. 4# -------------------------------------------------------------------------------------------- 5# pylint: disable=line-too-long 6from knack.util import CLIError 7from azure.synapse.spark.models import SparkBatchJobOptions, SparkSessionOptions, SparkStatementOptions 8from .._client_factory import cf_synapse_spark_batch, cf_synapse_spark_session 9from ..util import categorized_files, check_udfs_folder 10from ..constant import DOTNET_CLASS, DOTNET_FILE, SPARK_DOTNET_UDFS_FOLDER_NAME, EXECUTOR_SIZE, \ 11 SPARK_DOTNET_ASSEMBLY_SEARCH_PATHS_KEY, SparkBatchLanguage 12from shlex import split 13 14 15# Spark batch job 16def get_spark_batch_job(cmd, job_id, workspace_name, spark_pool_name): 17 client = cf_synapse_spark_batch(cmd.cli_ctx, workspace_name, spark_pool_name) 18 return client.get_spark_batch_job(job_id, detailed=True) 19 20 21def cancel_spark_batch_job(cmd, job_id, workspace_name, spark_pool_name): 22 client = cf_synapse_spark_batch(cmd.cli_ctx, workspace_name, spark_pool_name) 23 return client.cancel_spark_batch_job(job_id) 24 25 26def list_spark_batch_jobs(cmd, workspace_name, spark_pool_name, from_index=None, size=None): 27 client = cf_synapse_spark_batch(cmd.cli_ctx, workspace_name, spark_pool_name) 28 return client.get_spark_batch_jobs(from_index, size, detailed=True) 29 30 31def create_spark_batch_job(cmd, workspace_name, spark_pool_name, job_name, main_definition_file, 32 main_class_name, executor_size, executors, language=SparkBatchLanguage.Scala, 33 command_line_arguments=None, 34 reference_files=None, archives=None, configuration=None, 35 tags=None): 36 # pylint: disable-msg=too-many-locals 37 client = cf_synapse_spark_batch(cmd.cli_ctx, workspace_name, spark_pool_name) 38 file = main_definition_file 39 class_name = main_class_name 40 final_command_line_arguments = [] 41 for item in command_line_arguments: 42 final_command_line_arguments.append(' '.join(item)) 43 # e.g --arguments a b; command_line_arguments =[['a', 'b']] 44 if len(command_line_arguments) == 1 and len(command_line_arguments[0]) != 1: 45 final_command_line_arguments = split(final_command_line_arguments[0]) 46 arguments = final_command_line_arguments 47 # dotnet spark 48 if language.upper() == SparkBatchLanguage.SparkDotNet.upper() or language.upper() == SparkBatchLanguage.CSharp.upper(): 49 file = DOTNET_FILE 50 class_name = DOTNET_CLASS 51 52 arguments = [main_definition_file, main_class_name] 53 if command_line_arguments: 54 arguments = arguments + command_line_arguments 55 56 archives = ["{}#{}".format(main_definition_file, SPARK_DOTNET_UDFS_FOLDER_NAME)] + archives if archives \ 57 else ["{}#{}".format(main_definition_file, SPARK_DOTNET_UDFS_FOLDER_NAME)] 58 59 if not configuration: 60 configuration = {SPARK_DOTNET_ASSEMBLY_SEARCH_PATHS_KEY: './{}'.format(SPARK_DOTNET_UDFS_FOLDER_NAME)} 61 else: 62 check_udfs_folder(configuration) 63 64 files = None 65 jars = None 66 if reference_files: 67 files, jars = categorized_files(reference_files) 68 driver_cores = EXECUTOR_SIZE[executor_size]['Cores'] 69 driver_memory = EXECUTOR_SIZE[executor_size]['Memory'] 70 executor_cores = EXECUTOR_SIZE[executor_size]['Cores'] 71 executor_memory = EXECUTOR_SIZE[executor_size]['Memory'] 72 73 spark_batch_job_options = SparkBatchJobOptions( 74 tags=tags, 75 name=job_name, 76 file=file, 77 class_name=class_name, 78 arguments=arguments, 79 jars=jars, 80 files=files, 81 archives=archives, 82 configuration=configuration, 83 driver_memory=driver_memory, 84 driver_cores=driver_cores, 85 executor_memory=executor_memory, 86 executor_cores=executor_cores, 87 executor_count=executors) 88 89 return client.create_spark_batch_job(spark_batch_job_options, detailed=True) 90 91 92# Spark Session 93def list_spark_session_jobs(cmd, workspace_name, spark_pool_name, from_index=None, size=None): 94 client = cf_synapse_spark_session(cmd.cli_ctx, workspace_name, spark_pool_name) 95 return client.get_spark_sessions(from_index, size, detailed=True) 96 97 98def create_spark_session_job(cmd, workspace_name, spark_pool_name, job_name, executor_size, executors, 99 reference_files=None, configuration=None, tags=None): 100 client = cf_synapse_spark_session(cmd.cli_ctx, workspace_name, spark_pool_name) 101 files = None 102 jars = None 103 if reference_files: 104 files, jars = categorized_files(reference_files) 105 driver_cores = EXECUTOR_SIZE[executor_size]['Cores'] 106 driver_memory = EXECUTOR_SIZE[executor_size]['Memory'] 107 executor_cores = EXECUTOR_SIZE[executor_size]['Cores'] 108 executor_memory = EXECUTOR_SIZE[executor_size]['Memory'] 109 110 spark_session_options = SparkSessionOptions( 111 tags=tags, 112 name=job_name, 113 jars=jars, 114 files=files, 115 conf=configuration, 116 driver_memory=driver_memory, 117 driver_cores=driver_cores, 118 executor_memory=executor_memory, 119 executor_cores=executor_cores, 120 executor_count=executors) 121 122 return client.create_spark_session(spark_session_options, detailed=True) 123 124 125def get_spark_session_job(cmd, workspace_name, spark_pool_name, session_id): 126 client = cf_synapse_spark_session(cmd.cli_ctx, workspace_name, spark_pool_name) 127 return client.get_spark_session(session_id, detailed=True) 128 129 130def cancel_spark_session_job(cmd, workspace_name, spark_pool_name, session_id): 131 client = cf_synapse_spark_session(cmd.cli_ctx, workspace_name, spark_pool_name) 132 return client.cancel_spark_session(session_id) 133 134 135def reset_timeout(cmd, workspace_name, spark_pool_name, session_id): 136 client = cf_synapse_spark_session(cmd.cli_ctx, workspace_name, spark_pool_name) 137 return client.reset_spark_session_timeout(session_id) 138 139 140# Spark Session Statement 141def list_spark_session_statements(cmd, workspace_name, spark_pool_name, session_id): 142 client = cf_synapse_spark_session(cmd.cli_ctx, workspace_name, spark_pool_name) 143 return client.get_spark_statements(session_id) 144 145 146def create_spark_session_statement(cmd, workspace_name, spark_pool_name, session_id, code, language): 147 client = cf_synapse_spark_session(cmd.cli_ctx, workspace_name, spark_pool_name) 148 if not code: 149 raise CLIError( 150 'Could not read code content from the supplied --code parameter. It is either empty or an invalid file.') 151 spark_statement_options = SparkStatementOptions(code=code, kind=language) 152 return client.create_spark_statement(session_id, spark_statement_options) 153 154 155def get_spark_session_statement(cmd, workspace_name, spark_pool_name, session_id, statement_id): 156 client = cf_synapse_spark_session(cmd.cli_ctx, workspace_name, spark_pool_name) 157 return client.get_spark_statement(session_id, statement_id) 158 159 160def cancel_spark_session_statement(cmd, workspace_name, spark_pool_name, session_id, statement_id): 161 client = cf_synapse_spark_session(cmd.cli_ctx, workspace_name, spark_pool_name) 162 return client.cancel_spark_statement(session_id, statement_id) 163