1# --------------------------------------------------------------------------------------------
2# Copyright (c) Microsoft Corporation. All rights reserved.
3# Licensed under the MIT License. See License.txt in the project root for license information.
4# --------------------------------------------------------------------------------------------
5# pylint: disable=line-too-long
6from knack.util import CLIError
7from azure.synapse.spark.models import SparkBatchJobOptions, SparkSessionOptions, SparkStatementOptions
8from .._client_factory import cf_synapse_spark_batch, cf_synapse_spark_session
9from ..util import categorized_files, check_udfs_folder
10from ..constant import DOTNET_CLASS, DOTNET_FILE, SPARK_DOTNET_UDFS_FOLDER_NAME, EXECUTOR_SIZE, \
11    SPARK_DOTNET_ASSEMBLY_SEARCH_PATHS_KEY, SparkBatchLanguage
12from shlex import split
13
14
15# Spark batch job
16def get_spark_batch_job(cmd, job_id, workspace_name, spark_pool_name):
17    client = cf_synapse_spark_batch(cmd.cli_ctx, workspace_name, spark_pool_name)
18    return client.get_spark_batch_job(job_id, detailed=True)
19
20
21def cancel_spark_batch_job(cmd, job_id, workspace_name, spark_pool_name):
22    client = cf_synapse_spark_batch(cmd.cli_ctx, workspace_name, spark_pool_name)
23    return client.cancel_spark_batch_job(job_id)
24
25
26def list_spark_batch_jobs(cmd, workspace_name, spark_pool_name, from_index=None, size=None):
27    client = cf_synapse_spark_batch(cmd.cli_ctx, workspace_name, spark_pool_name)
28    return client.get_spark_batch_jobs(from_index, size, detailed=True)
29
30
31def create_spark_batch_job(cmd, workspace_name, spark_pool_name, job_name, main_definition_file,
32                           main_class_name, executor_size, executors, language=SparkBatchLanguage.Scala,
33                           command_line_arguments=None,
34                           reference_files=None, archives=None, configuration=None,
35                           tags=None):
36    # pylint: disable-msg=too-many-locals
37    client = cf_synapse_spark_batch(cmd.cli_ctx, workspace_name, spark_pool_name)
38    file = main_definition_file
39    class_name = main_class_name
40    final_command_line_arguments = []
41    for item in command_line_arguments:
42        final_command_line_arguments.append(' '.join(item))
43    # e.g --arguments a b; command_line_arguments =[['a', 'b']]
44    if len(command_line_arguments) == 1 and len(command_line_arguments[0]) != 1:
45        final_command_line_arguments = split(final_command_line_arguments[0])
46    arguments = final_command_line_arguments
47    # dotnet spark
48    if language.upper() == SparkBatchLanguage.SparkDotNet.upper() or language.upper() == SparkBatchLanguage.CSharp.upper():
49        file = DOTNET_FILE
50        class_name = DOTNET_CLASS
51
52        arguments = [main_definition_file, main_class_name]
53        if command_line_arguments:
54            arguments = arguments + command_line_arguments
55
56        archives = ["{}#{}".format(main_definition_file, SPARK_DOTNET_UDFS_FOLDER_NAME)] + archives if archives \
57            else ["{}#{}".format(main_definition_file, SPARK_DOTNET_UDFS_FOLDER_NAME)]
58
59        if not configuration:
60            configuration = {SPARK_DOTNET_ASSEMBLY_SEARCH_PATHS_KEY: './{}'.format(SPARK_DOTNET_UDFS_FOLDER_NAME)}
61        else:
62            check_udfs_folder(configuration)
63
64    files = None
65    jars = None
66    if reference_files:
67        files, jars = categorized_files(reference_files)
68    driver_cores = EXECUTOR_SIZE[executor_size]['Cores']
69    driver_memory = EXECUTOR_SIZE[executor_size]['Memory']
70    executor_cores = EXECUTOR_SIZE[executor_size]['Cores']
71    executor_memory = EXECUTOR_SIZE[executor_size]['Memory']
72
73    spark_batch_job_options = SparkBatchJobOptions(
74        tags=tags,
75        name=job_name,
76        file=file,
77        class_name=class_name,
78        arguments=arguments,
79        jars=jars,
80        files=files,
81        archives=archives,
82        configuration=configuration,
83        driver_memory=driver_memory,
84        driver_cores=driver_cores,
85        executor_memory=executor_memory,
86        executor_cores=executor_cores,
87        executor_count=executors)
88
89    return client.create_spark_batch_job(spark_batch_job_options, detailed=True)
90
91
92# Spark Session
93def list_spark_session_jobs(cmd, workspace_name, spark_pool_name, from_index=None, size=None):
94    client = cf_synapse_spark_session(cmd.cli_ctx, workspace_name, spark_pool_name)
95    return client.get_spark_sessions(from_index, size, detailed=True)
96
97
98def create_spark_session_job(cmd, workspace_name, spark_pool_name, job_name, executor_size, executors,
99                             reference_files=None, configuration=None, tags=None):
100    client = cf_synapse_spark_session(cmd.cli_ctx, workspace_name, spark_pool_name)
101    files = None
102    jars = None
103    if reference_files:
104        files, jars = categorized_files(reference_files)
105    driver_cores = EXECUTOR_SIZE[executor_size]['Cores']
106    driver_memory = EXECUTOR_SIZE[executor_size]['Memory']
107    executor_cores = EXECUTOR_SIZE[executor_size]['Cores']
108    executor_memory = EXECUTOR_SIZE[executor_size]['Memory']
109
110    spark_session_options = SparkSessionOptions(
111        tags=tags,
112        name=job_name,
113        jars=jars,
114        files=files,
115        conf=configuration,
116        driver_memory=driver_memory,
117        driver_cores=driver_cores,
118        executor_memory=executor_memory,
119        executor_cores=executor_cores,
120        executor_count=executors)
121
122    return client.create_spark_session(spark_session_options, detailed=True)
123
124
125def get_spark_session_job(cmd, workspace_name, spark_pool_name, session_id):
126    client = cf_synapse_spark_session(cmd.cli_ctx, workspace_name, spark_pool_name)
127    return client.get_spark_session(session_id, detailed=True)
128
129
130def cancel_spark_session_job(cmd, workspace_name, spark_pool_name, session_id):
131    client = cf_synapse_spark_session(cmd.cli_ctx, workspace_name, spark_pool_name)
132    return client.cancel_spark_session(session_id)
133
134
135def reset_timeout(cmd, workspace_name, spark_pool_name, session_id):
136    client = cf_synapse_spark_session(cmd.cli_ctx, workspace_name, spark_pool_name)
137    return client.reset_spark_session_timeout(session_id)
138
139
140# Spark Session Statement
141def list_spark_session_statements(cmd, workspace_name, spark_pool_name, session_id):
142    client = cf_synapse_spark_session(cmd.cli_ctx, workspace_name, spark_pool_name)
143    return client.get_spark_statements(session_id)
144
145
146def create_spark_session_statement(cmd, workspace_name, spark_pool_name, session_id, code, language):
147    client = cf_synapse_spark_session(cmd.cli_ctx, workspace_name, spark_pool_name)
148    if not code:
149        raise CLIError(
150            'Could not read code content from the supplied --code parameter. It is either empty or an invalid file.')
151    spark_statement_options = SparkStatementOptions(code=code, kind=language)
152    return client.create_spark_statement(session_id, spark_statement_options)
153
154
155def get_spark_session_statement(cmd, workspace_name, spark_pool_name, session_id, statement_id):
156    client = cf_synapse_spark_session(cmd.cli_ctx, workspace_name, spark_pool_name)
157    return client.get_spark_statement(session_id, statement_id)
158
159
160def cancel_spark_session_statement(cmd, workspace_name, spark_pool_name, session_id, statement_id):
161    client = cf_synapse_spark_session(cmd.cli_ctx, workspace_name, spark_pool_name)
162    return client.cancel_spark_statement(session_id, statement_id)
163