1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.sql.internal
19
20import java.util.{NoSuchElementException, Properties}
21import java.util.concurrent.TimeUnit
22
23import scala.collection.JavaConverters._
24import scala.collection.immutable
25
26import org.apache.hadoop.fs.Path
27
28import org.apache.spark.internal.Logging
29import org.apache.spark.internal.config._
30import org.apache.spark.network.util.ByteUnit
31import org.apache.spark.sql.catalyst.analysis.Resolver
32
33////////////////////////////////////////////////////////////////////////////////////////////////////
34// This file defines the configuration options for Spark SQL.
35////////////////////////////////////////////////////////////////////////////////////////////////////
36
37
38object SQLConf {
39
40  private val sqlConfEntries = java.util.Collections.synchronizedMap(
41    new java.util.HashMap[String, ConfigEntry[_]]())
42
43  private[sql] def register(entry: ConfigEntry[_]): Unit = sqlConfEntries.synchronized {
44    require(!sqlConfEntries.containsKey(entry.key),
45      s"Duplicate SQLConfigEntry. ${entry.key} has been registered")
46    sqlConfEntries.put(entry.key, entry)
47  }
48
49  private[sql] object SQLConfigBuilder {
50
51    def apply(key: String): ConfigBuilder = new ConfigBuilder(key).onCreate(register)
52
53  }
54
55  val OPTIMIZER_MAX_ITERATIONS = SQLConfigBuilder("spark.sql.optimizer.maxIterations")
56    .internal()
57    .doc("The max number of iterations the optimizer and analyzer runs.")
58    .intConf
59    .createWithDefault(100)
60
61  val OPTIMIZER_INSET_CONVERSION_THRESHOLD =
62    SQLConfigBuilder("spark.sql.optimizer.inSetConversionThreshold")
63      .internal()
64      .doc("The threshold of set size for InSet conversion.")
65      .intConf
66      .createWithDefault(10)
67
68  val COMPRESS_CACHED = SQLConfigBuilder("spark.sql.inMemoryColumnarStorage.compressed")
69    .internal()
70    .doc("When set to true Spark SQL will automatically select a compression codec for each " +
71      "column based on statistics of the data.")
72    .booleanConf
73    .createWithDefault(true)
74
75  val COLUMN_BATCH_SIZE = SQLConfigBuilder("spark.sql.inMemoryColumnarStorage.batchSize")
76    .internal()
77    .doc("Controls the size of batches for columnar caching.  Larger batch sizes can improve " +
78      "memory utilization and compression, but risk OOMs when caching data.")
79    .intConf
80    .createWithDefault(10000)
81
82  val IN_MEMORY_PARTITION_PRUNING =
83    SQLConfigBuilder("spark.sql.inMemoryColumnarStorage.partitionPruning")
84      .internal()
85      .doc("When true, enable partition pruning for in-memory columnar tables.")
86      .booleanConf
87      .createWithDefault(true)
88
89  val PREFER_SORTMERGEJOIN = SQLConfigBuilder("spark.sql.join.preferSortMergeJoin")
90    .internal()
91    .doc("When true, prefer sort merge join over shuffle hash join.")
92    .booleanConf
93    .createWithDefault(true)
94
95  val RADIX_SORT_ENABLED = SQLConfigBuilder("spark.sql.sort.enableRadixSort")
96    .internal()
97    .doc("When true, enable use of radix sort when possible. Radix sort is much faster but " +
98      "requires additional memory to be reserved up-front. The memory overhead may be " +
99      "significant when sorting very small rows (up to 50% more in this case).")
100    .booleanConf
101    .createWithDefault(true)
102
103  val AUTO_BROADCASTJOIN_THRESHOLD = SQLConfigBuilder("spark.sql.autoBroadcastJoinThreshold")
104    .doc("Configures the maximum size in bytes for a table that will be broadcast to all worker " +
105      "nodes when performing a join.  By setting this value to -1 broadcasting can be disabled. " +
106      "Note that currently statistics are only supported for Hive Metastore tables where the " +
107      "command <code>ANALYZE TABLE &lt;tableName&gt; COMPUTE STATISTICS noscan</code> has been " +
108      "run, and file-based data source tables where the statistics are computed directly on " +
109      "the files of data.")
110    .longConf
111    .createWithDefault(10L * 1024 * 1024)
112
113  val LIMIT_SCALE_UP_FACTOR = SQLConfigBuilder("spark.sql.limit.scaleUpFactor")
114    .internal()
115    .doc("Minimal increase rate in number of partitions between attempts when executing a take " +
116      "on a query. Higher values lead to more partitions read. Lower values might lead to " +
117      "longer execution times as more jobs will be run")
118    .intConf
119    .createWithDefault(4)
120
121  val ENABLE_FALL_BACK_TO_HDFS_FOR_STATS =
122    SQLConfigBuilder("spark.sql.statistics.fallBackToHdfs")
123    .doc("If the table statistics are not available from table metadata enable fall back to hdfs." +
124      " This is useful in determining if a table is small enough to use auto broadcast joins.")
125    .booleanConf
126    .createWithDefault(false)
127
128  val DEFAULT_SIZE_IN_BYTES = SQLConfigBuilder("spark.sql.defaultSizeInBytes")
129    .internal()
130    .doc("The default table size used in query planning. By default, it is set to Long.MaxValue " +
131      "which is larger than `spark.sql.autoBroadcastJoinThreshold` to be more conservative. " +
132      "That is to say by default the optimizer will not choose to broadcast a table unless it " +
133      "knows for sure its size is small enough.")
134    .longConf
135    .createWithDefault(Long.MaxValue)
136
137  val SHUFFLE_PARTITIONS = SQLConfigBuilder("spark.sql.shuffle.partitions")
138    .doc("The default number of partitions to use when shuffling data for joins or aggregations.")
139    .intConf
140    .createWithDefault(200)
141
142  val SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE =
143    SQLConfigBuilder("spark.sql.adaptive.shuffle.targetPostShuffleInputSize")
144      .doc("The target post-shuffle input size in bytes of a task.")
145      .bytesConf(ByteUnit.BYTE)
146      .createWithDefault(64 * 1024 * 1024)
147
148  val ADAPTIVE_EXECUTION_ENABLED = SQLConfigBuilder("spark.sql.adaptive.enabled")
149    .doc("When true, enable adaptive query execution.")
150    .booleanConf
151    .createWithDefault(false)
152
153  val SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS =
154    SQLConfigBuilder("spark.sql.adaptive.minNumPostShufflePartitions")
155      .internal()
156      .doc("The advisory minimal number of post-shuffle partitions provided to " +
157        "ExchangeCoordinator. This setting is used in our test to make sure we " +
158        "have enough parallelism to expose issues that will not be exposed with a " +
159        "single partition. When the value is a non-positive value, this setting will " +
160        "not be provided to ExchangeCoordinator.")
161      .intConf
162      .createWithDefault(-1)
163
164  val SUBEXPRESSION_ELIMINATION_ENABLED =
165    SQLConfigBuilder("spark.sql.subexpressionElimination.enabled")
166      .internal()
167      .doc("When true, common subexpressions will be eliminated.")
168      .booleanConf
169      .createWithDefault(true)
170
171  val CASE_SENSITIVE = SQLConfigBuilder("spark.sql.caseSensitive")
172    .internal()
173    .doc("Whether the query analyzer should be case sensitive or not. " +
174      "Default to case insensitive. It is highly discouraged to turn on case sensitive mode.")
175    .booleanConf
176    .createWithDefault(false)
177
178  val PARQUET_SCHEMA_MERGING_ENABLED = SQLConfigBuilder("spark.sql.parquet.mergeSchema")
179    .doc("When true, the Parquet data source merges schemas collected from all data files, " +
180         "otherwise the schema is picked from the summary file or a random data file " +
181         "if no summary file is available.")
182    .booleanConf
183    .createWithDefault(false)
184
185  val PARQUET_SCHEMA_RESPECT_SUMMARIES = SQLConfigBuilder("spark.sql.parquet.respectSummaryFiles")
186    .doc("When true, we make assumption that all part-files of Parquet are consistent with " +
187         "summary files and we will ignore them when merging schema. Otherwise, if this is " +
188         "false, which is the default, we will merge all part-files. This should be considered " +
189         "as expert-only option, and shouldn't be enabled before knowing what it means exactly.")
190    .booleanConf
191    .createWithDefault(false)
192
193  val PARQUET_BINARY_AS_STRING = SQLConfigBuilder("spark.sql.parquet.binaryAsString")
194    .doc("Some other Parquet-producing systems, in particular Impala and older versions of " +
195      "Spark SQL, do not differentiate between binary data and strings when writing out the " +
196      "Parquet schema. This flag tells Spark SQL to interpret binary data as a string to provide " +
197      "compatibility with these systems.")
198    .booleanConf
199    .createWithDefault(false)
200
201  val PARQUET_INT96_AS_TIMESTAMP = SQLConfigBuilder("spark.sql.parquet.int96AsTimestamp")
202    .doc("Some Parquet-producing systems, in particular Impala, store Timestamp into INT96. " +
203      "Spark would also store Timestamp as INT96 because we need to avoid precision lost of the " +
204      "nanoseconds field. This flag tells Spark SQL to interpret INT96 data as a timestamp to " +
205      "provide compatibility with these systems.")
206    .booleanConf
207    .createWithDefault(true)
208
209  val PARQUET_CACHE_METADATA = SQLConfigBuilder("spark.sql.parquet.cacheMetadata")
210    .doc("Turns on caching of Parquet schema metadata. Can speed up querying of static data.")
211    .booleanConf
212    .createWithDefault(true)
213
214  val PARQUET_COMPRESSION = SQLConfigBuilder("spark.sql.parquet.compression.codec")
215    .doc("Sets the compression codec use when writing Parquet files. Acceptable values include: " +
216      "uncompressed, snappy, gzip, lzo.")
217    .stringConf
218    .transform(_.toLowerCase())
219    .checkValues(Set("uncompressed", "snappy", "gzip", "lzo"))
220    .createWithDefault("snappy")
221
222  val PARQUET_FILTER_PUSHDOWN_ENABLED = SQLConfigBuilder("spark.sql.parquet.filterPushdown")
223    .doc("Enables Parquet filter push-down optimization when set to true.")
224    .booleanConf
225    .createWithDefault(true)
226
227  val PARQUET_WRITE_LEGACY_FORMAT = SQLConfigBuilder("spark.sql.parquet.writeLegacyFormat")
228    .doc("Whether to follow Parquet's format specification when converting Parquet schema to " +
229      "Spark SQL schema and vice versa.")
230    .booleanConf
231    .createWithDefault(false)
232
233  val PARQUET_OUTPUT_COMMITTER_CLASS = SQLConfigBuilder("spark.sql.parquet.output.committer.class")
234    .doc("The output committer class used by Parquet. The specified class needs to be a " +
235      "subclass of org.apache.hadoop.mapreduce.OutputCommitter.  Typically, it's also a subclass " +
236      "of org.apache.parquet.hadoop.ParquetOutputCommitter.")
237    .internal()
238    .stringConf
239    .createWithDefault("org.apache.parquet.hadoop.ParquetOutputCommitter")
240
241  val PARQUET_VECTORIZED_READER_ENABLED =
242    SQLConfigBuilder("spark.sql.parquet.enableVectorizedReader")
243      .doc("Enables vectorized parquet decoding.")
244      .booleanConf
245      .createWithDefault(true)
246
247  val ORC_FILTER_PUSHDOWN_ENABLED = SQLConfigBuilder("spark.sql.orc.filterPushdown")
248    .doc("When true, enable filter pushdown for ORC files.")
249    .booleanConf
250    .createWithDefault(false)
251
252  val HIVE_VERIFY_PARTITION_PATH = SQLConfigBuilder("spark.sql.hive.verifyPartitionPath")
253    .doc("When true, check all the partition paths under the table\'s root directory " +
254         "when reading data stored in HDFS.")
255    .booleanConf
256    .createWithDefault(false)
257
258  val HIVE_METASTORE_PARTITION_PRUNING =
259    SQLConfigBuilder("spark.sql.hive.metastorePartitionPruning")
260      .doc("When true, some predicates will be pushed down into the Hive metastore so that " +
261           "unmatching partitions can be eliminated earlier. This only affects Hive tables " +
262           "not converted to filesource relations (see HiveUtils.CONVERT_METASTORE_PARQUET and " +
263           "HiveUtils.CONVERT_METASTORE_ORC for more information).")
264      .booleanConf
265      .createWithDefault(true)
266
267  val HIVE_MANAGE_FILESOURCE_PARTITIONS =
268    SQLConfigBuilder("spark.sql.hive.manageFilesourcePartitions")
269      .doc("When true, enable metastore partition management for file source tables as well. " +
270           "This includes both datasource and converted Hive tables. When partition managment " +
271           "is enabled, datasource tables store partition in the Hive metastore, and use the " +
272           "metastore to prune partitions during query planning.")
273      .booleanConf
274      .createWithDefault(true)
275
276  val HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE =
277    SQLConfigBuilder("spark.sql.hive.filesourcePartitionFileCacheSize")
278      .doc("When nonzero, enable caching of partition file metadata in memory. All tables share " +
279           "a cache that can use up to specified num bytes for file metadata. This conf only " +
280           "has an effect when hive filesource partition management is enabled.")
281      .longConf
282      .createWithDefault(250 * 1024 * 1024)
283
284  object HiveCaseSensitiveInferenceMode extends Enumeration {
285    val INFER_AND_SAVE, INFER_ONLY, NEVER_INFER = Value
286  }
287
288  val HIVE_CASE_SENSITIVE_INFERENCE = SQLConfigBuilder("spark.sql.hive.caseSensitiveInferenceMode")
289    .doc("Sets the action to take when a case-sensitive schema cannot be read from a Hive " +
290      "table's properties. Although Spark SQL itself is not case-sensitive, Hive compatible file " +
291      "formats such as Parquet are. Spark SQL must use a case-preserving schema when querying " +
292      "any table backed by files containing case-sensitive field names or queries may not return " +
293      "accurate results. Valid options include INFER_AND_SAVE (the default mode-- infer the " +
294      "case-sensitive schema from the underlying data files and write it back to the table " +
295      "properties), INFER_ONLY (infer the schema but don't attempt to write it to the table " +
296      "properties) and NEVER_INFER (fallback to using the case-insensitive metastore schema " +
297      "instead of inferring).")
298    .stringConf
299    .transform(_.toUpperCase())
300    .checkValues(HiveCaseSensitiveInferenceMode.values.map(_.toString))
301    .createWithDefault(HiveCaseSensitiveInferenceMode.NEVER_INFER.toString)
302
303  val OPTIMIZER_METADATA_ONLY = SQLConfigBuilder("spark.sql.optimizer.metadataOnly")
304    .doc("When true, enable the metadata-only query optimization that use the table's metadata " +
305      "to produce the partition columns instead of table scans. It applies when all the columns " +
306      "scanned are partition columns and the query has an aggregate operator that satisfies " +
307      "distinct semantics.")
308    .booleanConf
309    .createWithDefault(true)
310
311  val COLUMN_NAME_OF_CORRUPT_RECORD = SQLConfigBuilder("spark.sql.columnNameOfCorruptRecord")
312    .doc("The name of internal column for storing raw/un-parsed JSON records that fail to parse.")
313    .stringConf
314    .createWithDefault("_corrupt_record")
315
316  val BROADCAST_TIMEOUT = SQLConfigBuilder("spark.sql.broadcastTimeout")
317    .doc("Timeout in seconds for the broadcast wait time in broadcast joins.")
318    .intConf
319    .createWithDefault(5 * 60)
320
321  // This is only used for the thriftserver
322  val THRIFTSERVER_POOL = SQLConfigBuilder("spark.sql.thriftserver.scheduler.pool")
323    .doc("Set a Fair Scheduler pool for a JDBC client session.")
324    .stringConf
325    .createOptional
326
327  val THRIFTSERVER_INCREMENTAL_COLLECT =
328    SQLConfigBuilder("spark.sql.thriftServer.incrementalCollect")
329      .internal()
330      .doc("When true, enable incremental collection for execution in Thrift Server.")
331      .booleanConf
332      .createWithDefault(false)
333
334  val THRIFTSERVER_UI_STATEMENT_LIMIT =
335    SQLConfigBuilder("spark.sql.thriftserver.ui.retainedStatements")
336      .doc("The number of SQL statements kept in the JDBC/ODBC web UI history.")
337      .intConf
338      .createWithDefault(200)
339
340  val THRIFTSERVER_UI_SESSION_LIMIT = SQLConfigBuilder("spark.sql.thriftserver.ui.retainedSessions")
341    .doc("The number of SQL client sessions kept in the JDBC/ODBC web UI history.")
342    .intConf
343    .createWithDefault(200)
344
345  // This is used to set the default data source
346  val DEFAULT_DATA_SOURCE_NAME = SQLConfigBuilder("spark.sql.sources.default")
347    .doc("The default data source to use in input/output.")
348    .stringConf
349    .createWithDefault("parquet")
350
351  val CONVERT_CTAS = SQLConfigBuilder("spark.sql.hive.convertCTAS")
352    .internal()
353    .doc("When true, a table created by a Hive CTAS statement (no USING clause) " +
354      "without specifying any storage property will be converted to a data source table, " +
355      "using the data source set by spark.sql.sources.default.")
356    .booleanConf
357    .createWithDefault(false)
358
359  val GATHER_FASTSTAT = SQLConfigBuilder("spark.sql.hive.gatherFastStats")
360      .internal()
361      .doc("When true, fast stats (number of files and total size of all files) will be gathered" +
362        " in parallel while repairing table partitions to avoid the sequential listing in Hive" +
363        " metastore.")
364      .booleanConf
365      .createWithDefault(true)
366
367  val PARTITION_COLUMN_TYPE_INFERENCE =
368    SQLConfigBuilder("spark.sql.sources.partitionColumnTypeInference.enabled")
369      .doc("When true, automatically infer the data types for partitioned columns.")
370      .booleanConf
371      .createWithDefault(true)
372
373  val BUCKETING_ENABLED = SQLConfigBuilder("spark.sql.sources.bucketing.enabled")
374    .doc("When false, we will treat bucketed table as normal table")
375    .booleanConf
376    .createWithDefault(true)
377
378  val CROSS_JOINS_ENABLED = SQLConfigBuilder("spark.sql.crossJoin.enabled")
379    .doc("When false, we will throw an error if a query contains a cartesian product without " +
380        "explicit CROSS JOIN syntax.")
381    .booleanConf
382    .createWithDefault(false)
383
384  val ORDER_BY_ORDINAL = SQLConfigBuilder("spark.sql.orderByOrdinal")
385    .doc("When true, the ordinal numbers are treated as the position in the select list. " +
386         "When false, the ordinal numbers in order/sort by clause are ignored.")
387    .booleanConf
388    .createWithDefault(true)
389
390  val GROUP_BY_ORDINAL = SQLConfigBuilder("spark.sql.groupByOrdinal")
391    .doc("When true, the ordinal numbers in group by clauses are treated as the position " +
392      "in the select list. When false, the ordinal numbers are ignored.")
393    .booleanConf
394    .createWithDefault(true)
395
396  // The output committer class used by data sources. The specified class needs to be a
397  // subclass of org.apache.hadoop.mapreduce.OutputCommitter.
398  val OUTPUT_COMMITTER_CLASS =
399    SQLConfigBuilder("spark.sql.sources.outputCommitterClass").internal().stringConf.createOptional
400
401  val FILE_COMMIT_PROTOCOL_CLASS =
402    SQLConfigBuilder("spark.sql.sources.commitProtocolClass")
403      .internal()
404      .stringConf
405      .createWithDefault(
406        "org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol")
407
408  val PARALLEL_PARTITION_DISCOVERY_THRESHOLD =
409    SQLConfigBuilder("spark.sql.sources.parallelPartitionDiscovery.threshold")
410      .doc("The maximum number of files allowed for listing files at driver side. If the number " +
411        "of detected files exceeds this value during partition discovery, it tries to list the " +
412        "files with another Spark distributed job. This applies to Parquet, ORC, CSV, JSON and " +
413        "LibSVM data sources.")
414      .intConf
415      .createWithDefault(32)
416
417  val PARALLEL_PARTITION_DISCOVERY_PARALLELISM =
418    SQLConfigBuilder("spark.sql.sources.parallelPartitionDiscovery.parallelism")
419      .doc("The number of parallelism to list a collection of path recursively, Set the " +
420        "number to prevent file listing from generating too many tasks.")
421      .internal()
422      .intConf
423      .createWithDefault(10000)
424
425  // Whether to automatically resolve ambiguity in join conditions for self-joins.
426  // See SPARK-6231.
427  val DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY =
428    SQLConfigBuilder("spark.sql.selfJoinAutoResolveAmbiguity")
429      .internal()
430      .booleanConf
431      .createWithDefault(true)
432
433  // Whether to retain group by columns or not in GroupedData.agg.
434  val DATAFRAME_RETAIN_GROUP_COLUMNS = SQLConfigBuilder("spark.sql.retainGroupColumns")
435    .internal()
436    .booleanConf
437    .createWithDefault(true)
438
439  val DATAFRAME_PIVOT_MAX_VALUES = SQLConfigBuilder("spark.sql.pivotMaxValues")
440    .doc("When doing a pivot without specifying values for the pivot column this is the maximum " +
441      "number of (distinct) values that will be collected without error.")
442    .intConf
443    .createWithDefault(10000)
444
445  val RUN_SQL_ON_FILES = SQLConfigBuilder("spark.sql.runSQLOnFiles")
446    .internal()
447    .doc("When true, we could use `datasource`.`path` as table in SQL query.")
448    .booleanConf
449    .createWithDefault(true)
450
451  val WHOLESTAGE_CODEGEN_ENABLED = SQLConfigBuilder("spark.sql.codegen.wholeStage")
452    .internal()
453    .doc("When true, the whole stage (of multiple operators) will be compiled into single java" +
454      " method.")
455    .booleanConf
456    .createWithDefault(true)
457
458  val WHOLESTAGE_MAX_NUM_FIELDS = SQLConfigBuilder("spark.sql.codegen.maxFields")
459    .internal()
460    .doc("The maximum number of fields (including nested fields) that will be supported before" +
461      " deactivating whole-stage codegen.")
462    .intConf
463    .createWithDefault(100)
464
465  val WHOLESTAGE_FALLBACK = SQLConfigBuilder("spark.sql.codegen.fallback")
466    .internal()
467    .doc("When true, whole stage codegen could be temporary disabled for the part of query that" +
468      " fail to compile generated code")
469    .booleanConf
470    .createWithDefault(true)
471
472  val MAX_CASES_BRANCHES = SQLConfigBuilder("spark.sql.codegen.maxCaseBranches")
473    .internal()
474    .doc("The maximum number of switches supported with codegen.")
475    .intConf
476    .createWithDefault(20)
477
478  val FILES_MAX_PARTITION_BYTES = SQLConfigBuilder("spark.sql.files.maxPartitionBytes")
479    .doc("The maximum number of bytes to pack into a single partition when reading files.")
480    .longConf
481    .createWithDefault(128 * 1024 * 1024) // parquet.block.size
482
483  val FILES_OPEN_COST_IN_BYTES = SQLConfigBuilder("spark.sql.files.openCostInBytes")
484    .internal()
485    .doc("The estimated cost to open a file, measured by the number of bytes could be scanned in" +
486      " the same time. This is used when putting multiple files into a partition. It's better to" +
487      " over estimated, then the partitions with small files will be faster than partitions with" +
488      " bigger files (which is scheduled first).")
489    .longConf
490    .createWithDefault(4 * 1024 * 1024)
491
492  val EXCHANGE_REUSE_ENABLED = SQLConfigBuilder("spark.sql.exchange.reuse")
493    .internal()
494    .doc("When true, the planner will try to find out duplicated exchanges and re-use them.")
495    .booleanConf
496    .createWithDefault(true)
497
498  val STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT =
499    SQLConfigBuilder("spark.sql.streaming.stateStore.minDeltasForSnapshot")
500      .internal()
501      .doc("Minimum number of state store delta files that needs to be generated before they " +
502        "consolidated into snapshots.")
503      .intConf
504      .createWithDefault(10)
505
506  val CHECKPOINT_LOCATION = SQLConfigBuilder("spark.sql.streaming.checkpointLocation")
507    .doc("The default location for storing checkpoint data for streaming queries.")
508    .stringConf
509    .createOptional
510
511  val MIN_BATCHES_TO_RETAIN = SQLConfigBuilder("spark.sql.streaming.minBatchesToRetain")
512    .internal()
513    .doc("The minimum number of batches that must be retained and made recoverable.")
514    .intConf
515    .createWithDefault(100)
516
517  val UNSUPPORTED_OPERATION_CHECK_ENABLED =
518    SQLConfigBuilder("spark.sql.streaming.unsupportedOperationCheck")
519      .internal()
520      .doc("When true, the logical plan for streaming query will be checked for unsupported" +
521        " operations.")
522      .booleanConf
523      .createWithDefault(true)
524
525  val VARIABLE_SUBSTITUTE_ENABLED =
526    SQLConfigBuilder("spark.sql.variable.substitute")
527      .doc("This enables substitution using syntax like ${var} ${system:var} and ${env:var}.")
528      .booleanConf
529      .createWithDefault(true)
530
531  val VARIABLE_SUBSTITUTE_DEPTH =
532    SQLConfigBuilder("spark.sql.variable.substitute.depth")
533      .internal()
534      .doc("Deprecated: The maximum replacements the substitution engine will do.")
535      .intConf
536      .createWithDefault(40)
537
538  val ENABLE_TWOLEVEL_AGG_MAP =
539    SQLConfigBuilder("spark.sql.codegen.aggregate.map.twolevel.enable")
540      .internal()
541      .doc("Enable two-level aggregate hash map. When enabled, records will first be " +
542        "inserted/looked-up at a 1st-level, small, fast map, and then fallback to a " +
543        "2nd-level, larger, slower map when 1st level is full or keys cannot be found. " +
544        "When disabled, records go directly to the 2nd level. Defaults to true.")
545      .booleanConf
546      .createWithDefault(true)
547
548  val STREAMING_FILE_COMMIT_PROTOCOL_CLASS =
549    SQLConfigBuilder("spark.sql.streaming.commitProtocolClass")
550      .internal()
551      .stringConf
552      .createWithDefault("org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol")
553
554  val FILE_SINK_LOG_DELETION = SQLConfigBuilder("spark.sql.streaming.fileSink.log.deletion")
555    .internal()
556    .doc("Whether to delete the expired log files in file stream sink.")
557    .booleanConf
558    .createWithDefault(true)
559
560  val FILE_SINK_LOG_COMPACT_INTERVAL =
561    SQLConfigBuilder("spark.sql.streaming.fileSink.log.compactInterval")
562      .internal()
563      .doc("Number of log files after which all the previous files " +
564        "are compacted into the next log file.")
565      .intConf
566      .createWithDefault(10)
567
568  val FILE_SINK_LOG_CLEANUP_DELAY =
569    SQLConfigBuilder("spark.sql.streaming.fileSink.log.cleanupDelay")
570      .internal()
571      .doc("How long that a file is guaranteed to be visible for all readers.")
572      .timeConf(TimeUnit.MILLISECONDS)
573      .createWithDefault(TimeUnit.MINUTES.toMillis(10)) // 10 minutes
574
575  val FILE_SOURCE_LOG_DELETION = SQLConfigBuilder("spark.sql.streaming.fileSource.log.deletion")
576    .internal()
577    .doc("Whether to delete the expired log files in file stream source.")
578    .booleanConf
579    .createWithDefault(true)
580
581  val FILE_SOURCE_LOG_COMPACT_INTERVAL =
582    SQLConfigBuilder("spark.sql.streaming.fileSource.log.compactInterval")
583      .internal()
584      .doc("Number of log files after which all the previous files " +
585        "are compacted into the next log file.")
586      .intConf
587      .createWithDefault(10)
588
589  val FILE_SOURCE_LOG_CLEANUP_DELAY =
590    SQLConfigBuilder("spark.sql.streaming.fileSource.log.cleanupDelay")
591      .internal()
592      .doc("How long in milliseconds a file is guaranteed to be visible for all readers.")
593      .timeConf(TimeUnit.MILLISECONDS)
594      .createWithDefault(TimeUnit.MINUTES.toMillis(10)) // 10 minutes
595
596  val STREAMING_SCHEMA_INFERENCE =
597    SQLConfigBuilder("spark.sql.streaming.schemaInference")
598      .internal()
599      .doc("Whether file-based streaming sources will infer its own schema")
600      .booleanConf
601      .createWithDefault(false)
602
603  val STREAMING_POLLING_DELAY =
604    SQLConfigBuilder("spark.sql.streaming.pollingDelay")
605      .internal()
606      .doc("How long to delay polling new data when no data is available")
607      .timeConf(TimeUnit.MILLISECONDS)
608      .createWithDefault(10L)
609
610  val STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL =
611    SQLConfigBuilder("spark.sql.streaming.noDataProgressEventInterval")
612      .internal()
613      .doc("How long to wait between two progress events when there is no data")
614      .timeConf(TimeUnit.MILLISECONDS)
615      .createWithDefault(10000L)
616
617  val STREAMING_METRICS_ENABLED =
618    SQLConfigBuilder("spark.sql.streaming.metricsEnabled")
619      .doc("Whether Dropwizard/Codahale metrics will be reported for active streaming queries.")
620      .booleanConf
621      .createWithDefault(false)
622
623  val STREAMING_PROGRESS_RETENTION =
624    SQLConfigBuilder("spark.sql.streaming.numRecentProgressUpdates")
625      .doc("The number of progress updates to retain for a streaming query")
626      .intConf
627      .createWithDefault(100)
628
629  val NDV_MAX_ERROR =
630    SQLConfigBuilder("spark.sql.statistics.ndv.maxError")
631      .internal()
632      .doc("The maximum estimation error allowed in HyperLogLog++ algorithm when generating " +
633        "column level statistics.")
634      .doubleConf
635      .createWithDefault(0.05)
636
637  val IGNORE_CORRUPT_FILES = SQLConfigBuilder("spark.sql.files.ignoreCorruptFiles")
638    .doc("Whether to ignore corrupt files. If true, the Spark jobs will continue to run when " +
639      "encountering corrupted or non-existing and contents that have been read will still be " +
640      "returned.")
641    .booleanConf
642    .createWithDefault(false)
643
644  object Deprecated {
645    val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
646  }
647}
648
649/**
650 * A class that enables the setting and getting of mutable config parameters/hints.
651 *
652 * In the presence of a SQLContext, these can be set and queried by passing SET commands
653 * into Spark SQL's query functions (i.e. sql()). Otherwise, users of this class can
654 * modify the hints by programmatically calling the setters and getters of this class.
655 *
656 * SQLConf is thread-safe (internally synchronized, so safe to be used in multiple threads).
657 */
658class SQLConf extends Serializable with Logging {
659  import SQLConf._
660
661  /** Only low degree of contention is expected for conf, thus NOT using ConcurrentHashMap. */
662  @transient protected[spark] val settings = java.util.Collections.synchronizedMap(
663    new java.util.HashMap[String, String]())
664
665  @transient private val reader = new ConfigReader(settings)
666
667  /** ************************ Spark SQL Params/Hints ******************* */
668
669  def optimizerMaxIterations: Int = getConf(OPTIMIZER_MAX_ITERATIONS)
670
671  def optimizerInSetConversionThreshold: Int = getConf(OPTIMIZER_INSET_CONVERSION_THRESHOLD)
672
673  def stateStoreMinDeltasForSnapshot: Int = getConf(STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT)
674
675  def checkpointLocation: Option[String] = getConf(CHECKPOINT_LOCATION)
676
677  def isUnsupportedOperationCheckEnabled: Boolean = getConf(UNSUPPORTED_OPERATION_CHECK_ENABLED)
678
679  def streamingFileCommitProtocolClass: String = getConf(STREAMING_FILE_COMMIT_PROTOCOL_CLASS)
680
681  def fileSinkLogDeletion: Boolean = getConf(FILE_SINK_LOG_DELETION)
682
683  def fileSinkLogCompactInterval: Int = getConf(FILE_SINK_LOG_COMPACT_INTERVAL)
684
685  def fileSinkLogCleanupDelay: Long = getConf(FILE_SINK_LOG_CLEANUP_DELAY)
686
687  def fileSourceLogDeletion: Boolean = getConf(FILE_SOURCE_LOG_DELETION)
688
689  def fileSourceLogCompactInterval: Int = getConf(FILE_SOURCE_LOG_COMPACT_INTERVAL)
690
691  def fileSourceLogCleanupDelay: Long = getConf(FILE_SOURCE_LOG_CLEANUP_DELAY)
692
693  def streamingSchemaInference: Boolean = getConf(STREAMING_SCHEMA_INFERENCE)
694
695  def streamingPollingDelay: Long = getConf(STREAMING_POLLING_DELAY)
696
697  def streamingNoDataProgressEventInterval: Long =
698    getConf(STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL)
699
700  def streamingMetricsEnabled: Boolean = getConf(STREAMING_METRICS_ENABLED)
701
702  def streamingProgressRetention: Int = getConf(STREAMING_PROGRESS_RETENTION)
703
704  def filesMaxPartitionBytes: Long = getConf(FILES_MAX_PARTITION_BYTES)
705
706  def filesOpenCostInBytes: Long = getConf(FILES_OPEN_COST_IN_BYTES)
707
708  def useCompression: Boolean = getConf(COMPRESS_CACHED)
709
710  def parquetCompressionCodec: String = getConf(PARQUET_COMPRESSION)
711
712  def parquetCacheMetadata: Boolean = getConf(PARQUET_CACHE_METADATA)
713
714  def parquetVectorizedReaderEnabled: Boolean = getConf(PARQUET_VECTORIZED_READER_ENABLED)
715
716  def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE)
717
718  def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS)
719
720  def targetPostShuffleInputSize: Long =
721    getConf(SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE)
722
723  def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED)
724
725  def minNumPostShufflePartitions: Int =
726    getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS)
727
728  def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN)
729
730  def parquetFilterPushDown: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_ENABLED)
731
732  def orcFilterPushDown: Boolean = getConf(ORC_FILTER_PUSHDOWN_ENABLED)
733
734  def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITION_PATH)
735
736  def metastorePartitionPruning: Boolean = getConf(HIVE_METASTORE_PARTITION_PRUNING)
737
738  def manageFilesourcePartitions: Boolean = getConf(HIVE_MANAGE_FILESOURCE_PARTITIONS)
739
740  def filesourcePartitionFileCacheSize: Long = getConf(HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE)
741
742  def caseSensitiveInferenceMode: HiveCaseSensitiveInferenceMode.Value =
743    HiveCaseSensitiveInferenceMode.withName(getConf(HIVE_CASE_SENSITIVE_INFERENCE))
744
745  def gatherFastStats: Boolean = getConf(GATHER_FASTSTAT)
746
747  def optimizerMetadataOnly: Boolean = getConf(OPTIMIZER_METADATA_ONLY)
748
749  def wholeStageEnabled: Boolean = getConf(WHOLESTAGE_CODEGEN_ENABLED)
750
751  def wholeStageMaxNumFields: Int = getConf(WHOLESTAGE_MAX_NUM_FIELDS)
752
753  def wholeStageFallback: Boolean = getConf(WHOLESTAGE_FALLBACK)
754
755  def maxCaseBranchesForCodegen: Int = getConf(MAX_CASES_BRANCHES)
756
757  def exchangeReuseEnabled: Boolean = getConf(EXCHANGE_REUSE_ENABLED)
758
759  def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE)
760
761  /**
762   * Returns the [[Resolver]] for the current configuration, which can be used to determine if two
763   * identifiers are equal.
764   */
765  def resolver: Resolver = {
766    if (caseSensitiveAnalysis) {
767      org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
768    } else {
769      org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution
770    }
771  }
772
773  def subexpressionEliminationEnabled: Boolean =
774    getConf(SUBEXPRESSION_ELIMINATION_ENABLED)
775
776  def autoBroadcastJoinThreshold: Long = getConf(AUTO_BROADCASTJOIN_THRESHOLD)
777
778  def limitScaleUpFactor: Int = getConf(LIMIT_SCALE_UP_FACTOR)
779
780  def fallBackToHdfsForStatsEnabled: Boolean = getConf(ENABLE_FALL_BACK_TO_HDFS_FOR_STATS)
781
782  def preferSortMergeJoin: Boolean = getConf(PREFER_SORTMERGEJOIN)
783
784  def enableRadixSort: Boolean = getConf(RADIX_SORT_ENABLED)
785
786  def defaultSizeInBytes: Long = getConf(DEFAULT_SIZE_IN_BYTES)
787
788  def isParquetSchemaMergingEnabled: Boolean = getConf(PARQUET_SCHEMA_MERGING_ENABLED)
789
790  def isParquetSchemaRespectSummaries: Boolean = getConf(PARQUET_SCHEMA_RESPECT_SUMMARIES)
791
792  def parquetOutputCommitterClass: String = getConf(PARQUET_OUTPUT_COMMITTER_CLASS)
793
794  def isParquetBinaryAsString: Boolean = getConf(PARQUET_BINARY_AS_STRING)
795
796  def isParquetINT96AsTimestamp: Boolean = getConf(PARQUET_INT96_AS_TIMESTAMP)
797
798  def writeLegacyParquetFormat: Boolean = getConf(PARQUET_WRITE_LEGACY_FORMAT)
799
800  def inMemoryPartitionPruning: Boolean = getConf(IN_MEMORY_PARTITION_PRUNING)
801
802  def columnNameOfCorruptRecord: String = getConf(COLUMN_NAME_OF_CORRUPT_RECORD)
803
804  def broadcastTimeout: Int = getConf(BROADCAST_TIMEOUT)
805
806  def defaultDataSourceName: String = getConf(DEFAULT_DATA_SOURCE_NAME)
807
808  def convertCTAS: Boolean = getConf(CONVERT_CTAS)
809
810  def partitionColumnTypeInferenceEnabled: Boolean =
811    getConf(SQLConf.PARTITION_COLUMN_TYPE_INFERENCE)
812
813  def fileCommitProtocolClass: String = getConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS)
814
815  def parallelPartitionDiscoveryThreshold: Int =
816    getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD)
817
818  def parallelPartitionDiscoveryParallelism: Int =
819    getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_PARALLELISM)
820
821  def bucketingEnabled: Boolean = getConf(SQLConf.BUCKETING_ENABLED)
822
823  def dataFrameSelfJoinAutoResolveAmbiguity: Boolean =
824    getConf(DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY)
825
826  def dataFrameRetainGroupColumns: Boolean = getConf(DATAFRAME_RETAIN_GROUP_COLUMNS)
827
828  def dataFramePivotMaxValues: Int = getConf(DATAFRAME_PIVOT_MAX_VALUES)
829
830  def runSQLonFile: Boolean = getConf(RUN_SQL_ON_FILES)
831
832  def enableTwoLevelAggMap: Boolean = getConf(ENABLE_TWOLEVEL_AGG_MAP)
833
834  def variableSubstituteEnabled: Boolean = getConf(VARIABLE_SUBSTITUTE_ENABLED)
835
836  def variableSubstituteDepth: Int = getConf(VARIABLE_SUBSTITUTE_DEPTH)
837
838  def warehousePath: String = new Path(getConf(StaticSQLConf.WAREHOUSE_PATH)).toString
839
840  def ignoreCorruptFiles: Boolean = getConf(IGNORE_CORRUPT_FILES)
841
842  def orderByOrdinal: Boolean = getConf(ORDER_BY_ORDINAL)
843
844  def groupByOrdinal: Boolean = getConf(GROUP_BY_ORDINAL)
845
846  def crossJoinEnabled: Boolean = getConf(SQLConf.CROSS_JOINS_ENABLED)
847
848  def ndvMaxError: Double = getConf(NDV_MAX_ERROR)
849  /** ********************** SQLConf functionality methods ************ */
850
851  /** Set Spark SQL configuration properties. */
852  def setConf(props: Properties): Unit = settings.synchronized {
853    props.asScala.foreach { case (k, v) => setConfString(k, v) }
854  }
855
856  /** Set the given Spark SQL configuration property using a `string` value. */
857  def setConfString(key: String, value: String): Unit = {
858    require(key != null, "key cannot be null")
859    require(value != null, s"value cannot be null for key: $key")
860    val entry = sqlConfEntries.get(key)
861    if (entry != null) {
862      // Only verify configs in the SQLConf object
863      entry.valueConverter(value)
864    }
865    setConfWithCheck(key, value)
866  }
867
868  /** Set the given Spark SQL configuration property. */
869  def setConf[T](entry: ConfigEntry[T], value: T): Unit = {
870    require(entry != null, "entry cannot be null")
871    require(value != null, s"value cannot be null for key: ${entry.key}")
872    require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered")
873    setConfWithCheck(entry.key, entry.stringConverter(value))
874  }
875
876  /** Return the value of Spark SQL configuration property for the given key. */
877  @throws[NoSuchElementException]("if key is not set")
878  def getConfString(key: String): String = {
879    Option(settings.get(key)).
880      orElse {
881        // Try to use the default value
882        Option(sqlConfEntries.get(key)).map(_.defaultValueString)
883      }.
884      getOrElse(throw new NoSuchElementException(key))
885  }
886
887  /**
888   * Return the value of Spark SQL configuration property for the given key. If the key is not set
889   * yet, return `defaultValue`. This is useful when `defaultValue` in ConfigEntry is not the
890   * desired one.
891   */
892  def getConf[T](entry: ConfigEntry[T], defaultValue: T): T = {
893    require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered")
894    Option(settings.get(entry.key)).map(entry.valueConverter).getOrElse(defaultValue)
895  }
896
897  /**
898   * Return the value of Spark SQL configuration property for the given key. If the key is not set
899   * yet, return `defaultValue` in [[ConfigEntry]].
900   */
901  def getConf[T](entry: ConfigEntry[T]): T = {
902    require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered")
903    entry.readFrom(reader)
904  }
905
906  /**
907   * Return the value of an optional Spark SQL configuration property for the given key. If the key
908   * is not set yet, returns None.
909   */
910  def getConf[T](entry: OptionalConfigEntry[T]): Option[T] = {
911    require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered")
912    entry.readFrom(reader)
913  }
914
915  /**
916   * Return the `string` value of Spark SQL configuration property for the given key. If the key is
917   * not set yet, return `defaultValue`.
918   */
919  def getConfString(key: String, defaultValue: String): String = {
920    val entry = sqlConfEntries.get(key)
921    if (entry != null && defaultValue != "<undefined>") {
922      // Only verify configs in the SQLConf object
923      entry.valueConverter(defaultValue)
924    }
925    Option(settings.get(key)).getOrElse(defaultValue)
926  }
927
928  /**
929   * Return all the configuration properties that have been set (i.e. not the default).
930   * This creates a new copy of the config properties in the form of a Map.
931   */
932  def getAllConfs: immutable.Map[String, String] =
933    settings.synchronized { settings.asScala.toMap }
934
935  /**
936   * Return all the configuration definitions that have been defined in [[SQLConf]]. Each
937   * definition contains key, defaultValue and doc.
938   */
939  def getAllDefinedConfs: Seq[(String, String, String)] = sqlConfEntries.synchronized {
940    sqlConfEntries.values.asScala.filter(_.isPublic).map { entry =>
941      (entry.key, getConfString(entry.key, entry.defaultValueString), entry.doc)
942    }.toSeq
943  }
944
945  /**
946   * Return whether a given key is set in this [[SQLConf]].
947   */
948  def contains(key: String): Boolean = {
949    settings.containsKey(key)
950  }
951
952  private def setConfWithCheck(key: String, value: String): Unit = {
953    settings.put(key, value)
954  }
955
956  def unsetConf(key: String): Unit = {
957    settings.remove(key)
958  }
959
960  def unsetConf(entry: ConfigEntry[_]): Unit = {
961    settings.remove(entry.key)
962  }
963
964  def clear(): Unit = {
965    settings.clear()
966  }
967}
968