1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.sql.internal 19 20import java.util.{NoSuchElementException, Properties} 21import java.util.concurrent.TimeUnit 22 23import scala.collection.JavaConverters._ 24import scala.collection.immutable 25 26import org.apache.hadoop.fs.Path 27 28import org.apache.spark.internal.Logging 29import org.apache.spark.internal.config._ 30import org.apache.spark.network.util.ByteUnit 31import org.apache.spark.sql.catalyst.analysis.Resolver 32 33//////////////////////////////////////////////////////////////////////////////////////////////////// 34// This file defines the configuration options for Spark SQL. 35//////////////////////////////////////////////////////////////////////////////////////////////////// 36 37 38object SQLConf { 39 40 private val sqlConfEntries = java.util.Collections.synchronizedMap( 41 new java.util.HashMap[String, ConfigEntry[_]]()) 42 43 private[sql] def register(entry: ConfigEntry[_]): Unit = sqlConfEntries.synchronized { 44 require(!sqlConfEntries.containsKey(entry.key), 45 s"Duplicate SQLConfigEntry. ${entry.key} has been registered") 46 sqlConfEntries.put(entry.key, entry) 47 } 48 49 private[sql] object SQLConfigBuilder { 50 51 def apply(key: String): ConfigBuilder = new ConfigBuilder(key).onCreate(register) 52 53 } 54 55 val OPTIMIZER_MAX_ITERATIONS = SQLConfigBuilder("spark.sql.optimizer.maxIterations") 56 .internal() 57 .doc("The max number of iterations the optimizer and analyzer runs.") 58 .intConf 59 .createWithDefault(100) 60 61 val OPTIMIZER_INSET_CONVERSION_THRESHOLD = 62 SQLConfigBuilder("spark.sql.optimizer.inSetConversionThreshold") 63 .internal() 64 .doc("The threshold of set size for InSet conversion.") 65 .intConf 66 .createWithDefault(10) 67 68 val COMPRESS_CACHED = SQLConfigBuilder("spark.sql.inMemoryColumnarStorage.compressed") 69 .internal() 70 .doc("When set to true Spark SQL will automatically select a compression codec for each " + 71 "column based on statistics of the data.") 72 .booleanConf 73 .createWithDefault(true) 74 75 val COLUMN_BATCH_SIZE = SQLConfigBuilder("spark.sql.inMemoryColumnarStorage.batchSize") 76 .internal() 77 .doc("Controls the size of batches for columnar caching. Larger batch sizes can improve " + 78 "memory utilization and compression, but risk OOMs when caching data.") 79 .intConf 80 .createWithDefault(10000) 81 82 val IN_MEMORY_PARTITION_PRUNING = 83 SQLConfigBuilder("spark.sql.inMemoryColumnarStorage.partitionPruning") 84 .internal() 85 .doc("When true, enable partition pruning for in-memory columnar tables.") 86 .booleanConf 87 .createWithDefault(true) 88 89 val PREFER_SORTMERGEJOIN = SQLConfigBuilder("spark.sql.join.preferSortMergeJoin") 90 .internal() 91 .doc("When true, prefer sort merge join over shuffle hash join.") 92 .booleanConf 93 .createWithDefault(true) 94 95 val RADIX_SORT_ENABLED = SQLConfigBuilder("spark.sql.sort.enableRadixSort") 96 .internal() 97 .doc("When true, enable use of radix sort when possible. Radix sort is much faster but " + 98 "requires additional memory to be reserved up-front. The memory overhead may be " + 99 "significant when sorting very small rows (up to 50% more in this case).") 100 .booleanConf 101 .createWithDefault(true) 102 103 val AUTO_BROADCASTJOIN_THRESHOLD = SQLConfigBuilder("spark.sql.autoBroadcastJoinThreshold") 104 .doc("Configures the maximum size in bytes for a table that will be broadcast to all worker " + 105 "nodes when performing a join. By setting this value to -1 broadcasting can be disabled. " + 106 "Note that currently statistics are only supported for Hive Metastore tables where the " + 107 "command <code>ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan</code> has been " + 108 "run, and file-based data source tables where the statistics are computed directly on " + 109 "the files of data.") 110 .longConf 111 .createWithDefault(10L * 1024 * 1024) 112 113 val LIMIT_SCALE_UP_FACTOR = SQLConfigBuilder("spark.sql.limit.scaleUpFactor") 114 .internal() 115 .doc("Minimal increase rate in number of partitions between attempts when executing a take " + 116 "on a query. Higher values lead to more partitions read. Lower values might lead to " + 117 "longer execution times as more jobs will be run") 118 .intConf 119 .createWithDefault(4) 120 121 val ENABLE_FALL_BACK_TO_HDFS_FOR_STATS = 122 SQLConfigBuilder("spark.sql.statistics.fallBackToHdfs") 123 .doc("If the table statistics are not available from table metadata enable fall back to hdfs." + 124 " This is useful in determining if a table is small enough to use auto broadcast joins.") 125 .booleanConf 126 .createWithDefault(false) 127 128 val DEFAULT_SIZE_IN_BYTES = SQLConfigBuilder("spark.sql.defaultSizeInBytes") 129 .internal() 130 .doc("The default table size used in query planning. By default, it is set to Long.MaxValue " + 131 "which is larger than `spark.sql.autoBroadcastJoinThreshold` to be more conservative. " + 132 "That is to say by default the optimizer will not choose to broadcast a table unless it " + 133 "knows for sure its size is small enough.") 134 .longConf 135 .createWithDefault(Long.MaxValue) 136 137 val SHUFFLE_PARTITIONS = SQLConfigBuilder("spark.sql.shuffle.partitions") 138 .doc("The default number of partitions to use when shuffling data for joins or aggregations.") 139 .intConf 140 .createWithDefault(200) 141 142 val SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE = 143 SQLConfigBuilder("spark.sql.adaptive.shuffle.targetPostShuffleInputSize") 144 .doc("The target post-shuffle input size in bytes of a task.") 145 .bytesConf(ByteUnit.BYTE) 146 .createWithDefault(64 * 1024 * 1024) 147 148 val ADAPTIVE_EXECUTION_ENABLED = SQLConfigBuilder("spark.sql.adaptive.enabled") 149 .doc("When true, enable adaptive query execution.") 150 .booleanConf 151 .createWithDefault(false) 152 153 val SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS = 154 SQLConfigBuilder("spark.sql.adaptive.minNumPostShufflePartitions") 155 .internal() 156 .doc("The advisory minimal number of post-shuffle partitions provided to " + 157 "ExchangeCoordinator. This setting is used in our test to make sure we " + 158 "have enough parallelism to expose issues that will not be exposed with a " + 159 "single partition. When the value is a non-positive value, this setting will " + 160 "not be provided to ExchangeCoordinator.") 161 .intConf 162 .createWithDefault(-1) 163 164 val SUBEXPRESSION_ELIMINATION_ENABLED = 165 SQLConfigBuilder("spark.sql.subexpressionElimination.enabled") 166 .internal() 167 .doc("When true, common subexpressions will be eliminated.") 168 .booleanConf 169 .createWithDefault(true) 170 171 val CASE_SENSITIVE = SQLConfigBuilder("spark.sql.caseSensitive") 172 .internal() 173 .doc("Whether the query analyzer should be case sensitive or not. " + 174 "Default to case insensitive. It is highly discouraged to turn on case sensitive mode.") 175 .booleanConf 176 .createWithDefault(false) 177 178 val PARQUET_SCHEMA_MERGING_ENABLED = SQLConfigBuilder("spark.sql.parquet.mergeSchema") 179 .doc("When true, the Parquet data source merges schemas collected from all data files, " + 180 "otherwise the schema is picked from the summary file or a random data file " + 181 "if no summary file is available.") 182 .booleanConf 183 .createWithDefault(false) 184 185 val PARQUET_SCHEMA_RESPECT_SUMMARIES = SQLConfigBuilder("spark.sql.parquet.respectSummaryFiles") 186 .doc("When true, we make assumption that all part-files of Parquet are consistent with " + 187 "summary files and we will ignore them when merging schema. Otherwise, if this is " + 188 "false, which is the default, we will merge all part-files. This should be considered " + 189 "as expert-only option, and shouldn't be enabled before knowing what it means exactly.") 190 .booleanConf 191 .createWithDefault(false) 192 193 val PARQUET_BINARY_AS_STRING = SQLConfigBuilder("spark.sql.parquet.binaryAsString") 194 .doc("Some other Parquet-producing systems, in particular Impala and older versions of " + 195 "Spark SQL, do not differentiate between binary data and strings when writing out the " + 196 "Parquet schema. This flag tells Spark SQL to interpret binary data as a string to provide " + 197 "compatibility with these systems.") 198 .booleanConf 199 .createWithDefault(false) 200 201 val PARQUET_INT96_AS_TIMESTAMP = SQLConfigBuilder("spark.sql.parquet.int96AsTimestamp") 202 .doc("Some Parquet-producing systems, in particular Impala, store Timestamp into INT96. " + 203 "Spark would also store Timestamp as INT96 because we need to avoid precision lost of the " + 204 "nanoseconds field. This flag tells Spark SQL to interpret INT96 data as a timestamp to " + 205 "provide compatibility with these systems.") 206 .booleanConf 207 .createWithDefault(true) 208 209 val PARQUET_CACHE_METADATA = SQLConfigBuilder("spark.sql.parquet.cacheMetadata") 210 .doc("Turns on caching of Parquet schema metadata. Can speed up querying of static data.") 211 .booleanConf 212 .createWithDefault(true) 213 214 val PARQUET_COMPRESSION = SQLConfigBuilder("spark.sql.parquet.compression.codec") 215 .doc("Sets the compression codec use when writing Parquet files. Acceptable values include: " + 216 "uncompressed, snappy, gzip, lzo.") 217 .stringConf 218 .transform(_.toLowerCase()) 219 .checkValues(Set("uncompressed", "snappy", "gzip", "lzo")) 220 .createWithDefault("snappy") 221 222 val PARQUET_FILTER_PUSHDOWN_ENABLED = SQLConfigBuilder("spark.sql.parquet.filterPushdown") 223 .doc("Enables Parquet filter push-down optimization when set to true.") 224 .booleanConf 225 .createWithDefault(true) 226 227 val PARQUET_WRITE_LEGACY_FORMAT = SQLConfigBuilder("spark.sql.parquet.writeLegacyFormat") 228 .doc("Whether to follow Parquet's format specification when converting Parquet schema to " + 229 "Spark SQL schema and vice versa.") 230 .booleanConf 231 .createWithDefault(false) 232 233 val PARQUET_OUTPUT_COMMITTER_CLASS = SQLConfigBuilder("spark.sql.parquet.output.committer.class") 234 .doc("The output committer class used by Parquet. The specified class needs to be a " + 235 "subclass of org.apache.hadoop.mapreduce.OutputCommitter. Typically, it's also a subclass " + 236 "of org.apache.parquet.hadoop.ParquetOutputCommitter.") 237 .internal() 238 .stringConf 239 .createWithDefault("org.apache.parquet.hadoop.ParquetOutputCommitter") 240 241 val PARQUET_VECTORIZED_READER_ENABLED = 242 SQLConfigBuilder("spark.sql.parquet.enableVectorizedReader") 243 .doc("Enables vectorized parquet decoding.") 244 .booleanConf 245 .createWithDefault(true) 246 247 val ORC_FILTER_PUSHDOWN_ENABLED = SQLConfigBuilder("spark.sql.orc.filterPushdown") 248 .doc("When true, enable filter pushdown for ORC files.") 249 .booleanConf 250 .createWithDefault(false) 251 252 val HIVE_VERIFY_PARTITION_PATH = SQLConfigBuilder("spark.sql.hive.verifyPartitionPath") 253 .doc("When true, check all the partition paths under the table\'s root directory " + 254 "when reading data stored in HDFS.") 255 .booleanConf 256 .createWithDefault(false) 257 258 val HIVE_METASTORE_PARTITION_PRUNING = 259 SQLConfigBuilder("spark.sql.hive.metastorePartitionPruning") 260 .doc("When true, some predicates will be pushed down into the Hive metastore so that " + 261 "unmatching partitions can be eliminated earlier. This only affects Hive tables " + 262 "not converted to filesource relations (see HiveUtils.CONVERT_METASTORE_PARQUET and " + 263 "HiveUtils.CONVERT_METASTORE_ORC for more information).") 264 .booleanConf 265 .createWithDefault(true) 266 267 val HIVE_MANAGE_FILESOURCE_PARTITIONS = 268 SQLConfigBuilder("spark.sql.hive.manageFilesourcePartitions") 269 .doc("When true, enable metastore partition management for file source tables as well. " + 270 "This includes both datasource and converted Hive tables. When partition managment " + 271 "is enabled, datasource tables store partition in the Hive metastore, and use the " + 272 "metastore to prune partitions during query planning.") 273 .booleanConf 274 .createWithDefault(true) 275 276 val HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE = 277 SQLConfigBuilder("spark.sql.hive.filesourcePartitionFileCacheSize") 278 .doc("When nonzero, enable caching of partition file metadata in memory. All tables share " + 279 "a cache that can use up to specified num bytes for file metadata. This conf only " + 280 "has an effect when hive filesource partition management is enabled.") 281 .longConf 282 .createWithDefault(250 * 1024 * 1024) 283 284 object HiveCaseSensitiveInferenceMode extends Enumeration { 285 val INFER_AND_SAVE, INFER_ONLY, NEVER_INFER = Value 286 } 287 288 val HIVE_CASE_SENSITIVE_INFERENCE = SQLConfigBuilder("spark.sql.hive.caseSensitiveInferenceMode") 289 .doc("Sets the action to take when a case-sensitive schema cannot be read from a Hive " + 290 "table's properties. Although Spark SQL itself is not case-sensitive, Hive compatible file " + 291 "formats such as Parquet are. Spark SQL must use a case-preserving schema when querying " + 292 "any table backed by files containing case-sensitive field names or queries may not return " + 293 "accurate results. Valid options include INFER_AND_SAVE (the default mode-- infer the " + 294 "case-sensitive schema from the underlying data files and write it back to the table " + 295 "properties), INFER_ONLY (infer the schema but don't attempt to write it to the table " + 296 "properties) and NEVER_INFER (fallback to using the case-insensitive metastore schema " + 297 "instead of inferring).") 298 .stringConf 299 .transform(_.toUpperCase()) 300 .checkValues(HiveCaseSensitiveInferenceMode.values.map(_.toString)) 301 .createWithDefault(HiveCaseSensitiveInferenceMode.NEVER_INFER.toString) 302 303 val OPTIMIZER_METADATA_ONLY = SQLConfigBuilder("spark.sql.optimizer.metadataOnly") 304 .doc("When true, enable the metadata-only query optimization that use the table's metadata " + 305 "to produce the partition columns instead of table scans. It applies when all the columns " + 306 "scanned are partition columns and the query has an aggregate operator that satisfies " + 307 "distinct semantics.") 308 .booleanConf 309 .createWithDefault(true) 310 311 val COLUMN_NAME_OF_CORRUPT_RECORD = SQLConfigBuilder("spark.sql.columnNameOfCorruptRecord") 312 .doc("The name of internal column for storing raw/un-parsed JSON records that fail to parse.") 313 .stringConf 314 .createWithDefault("_corrupt_record") 315 316 val BROADCAST_TIMEOUT = SQLConfigBuilder("spark.sql.broadcastTimeout") 317 .doc("Timeout in seconds for the broadcast wait time in broadcast joins.") 318 .intConf 319 .createWithDefault(5 * 60) 320 321 // This is only used for the thriftserver 322 val THRIFTSERVER_POOL = SQLConfigBuilder("spark.sql.thriftserver.scheduler.pool") 323 .doc("Set a Fair Scheduler pool for a JDBC client session.") 324 .stringConf 325 .createOptional 326 327 val THRIFTSERVER_INCREMENTAL_COLLECT = 328 SQLConfigBuilder("spark.sql.thriftServer.incrementalCollect") 329 .internal() 330 .doc("When true, enable incremental collection for execution in Thrift Server.") 331 .booleanConf 332 .createWithDefault(false) 333 334 val THRIFTSERVER_UI_STATEMENT_LIMIT = 335 SQLConfigBuilder("spark.sql.thriftserver.ui.retainedStatements") 336 .doc("The number of SQL statements kept in the JDBC/ODBC web UI history.") 337 .intConf 338 .createWithDefault(200) 339 340 val THRIFTSERVER_UI_SESSION_LIMIT = SQLConfigBuilder("spark.sql.thriftserver.ui.retainedSessions") 341 .doc("The number of SQL client sessions kept in the JDBC/ODBC web UI history.") 342 .intConf 343 .createWithDefault(200) 344 345 // This is used to set the default data source 346 val DEFAULT_DATA_SOURCE_NAME = SQLConfigBuilder("spark.sql.sources.default") 347 .doc("The default data source to use in input/output.") 348 .stringConf 349 .createWithDefault("parquet") 350 351 val CONVERT_CTAS = SQLConfigBuilder("spark.sql.hive.convertCTAS") 352 .internal() 353 .doc("When true, a table created by a Hive CTAS statement (no USING clause) " + 354 "without specifying any storage property will be converted to a data source table, " + 355 "using the data source set by spark.sql.sources.default.") 356 .booleanConf 357 .createWithDefault(false) 358 359 val GATHER_FASTSTAT = SQLConfigBuilder("spark.sql.hive.gatherFastStats") 360 .internal() 361 .doc("When true, fast stats (number of files and total size of all files) will be gathered" + 362 " in parallel while repairing table partitions to avoid the sequential listing in Hive" + 363 " metastore.") 364 .booleanConf 365 .createWithDefault(true) 366 367 val PARTITION_COLUMN_TYPE_INFERENCE = 368 SQLConfigBuilder("spark.sql.sources.partitionColumnTypeInference.enabled") 369 .doc("When true, automatically infer the data types for partitioned columns.") 370 .booleanConf 371 .createWithDefault(true) 372 373 val BUCKETING_ENABLED = SQLConfigBuilder("spark.sql.sources.bucketing.enabled") 374 .doc("When false, we will treat bucketed table as normal table") 375 .booleanConf 376 .createWithDefault(true) 377 378 val CROSS_JOINS_ENABLED = SQLConfigBuilder("spark.sql.crossJoin.enabled") 379 .doc("When false, we will throw an error if a query contains a cartesian product without " + 380 "explicit CROSS JOIN syntax.") 381 .booleanConf 382 .createWithDefault(false) 383 384 val ORDER_BY_ORDINAL = SQLConfigBuilder("spark.sql.orderByOrdinal") 385 .doc("When true, the ordinal numbers are treated as the position in the select list. " + 386 "When false, the ordinal numbers in order/sort by clause are ignored.") 387 .booleanConf 388 .createWithDefault(true) 389 390 val GROUP_BY_ORDINAL = SQLConfigBuilder("spark.sql.groupByOrdinal") 391 .doc("When true, the ordinal numbers in group by clauses are treated as the position " + 392 "in the select list. When false, the ordinal numbers are ignored.") 393 .booleanConf 394 .createWithDefault(true) 395 396 // The output committer class used by data sources. The specified class needs to be a 397 // subclass of org.apache.hadoop.mapreduce.OutputCommitter. 398 val OUTPUT_COMMITTER_CLASS = 399 SQLConfigBuilder("spark.sql.sources.outputCommitterClass").internal().stringConf.createOptional 400 401 val FILE_COMMIT_PROTOCOL_CLASS = 402 SQLConfigBuilder("spark.sql.sources.commitProtocolClass") 403 .internal() 404 .stringConf 405 .createWithDefault( 406 "org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol") 407 408 val PARALLEL_PARTITION_DISCOVERY_THRESHOLD = 409 SQLConfigBuilder("spark.sql.sources.parallelPartitionDiscovery.threshold") 410 .doc("The maximum number of files allowed for listing files at driver side. If the number " + 411 "of detected files exceeds this value during partition discovery, it tries to list the " + 412 "files with another Spark distributed job. This applies to Parquet, ORC, CSV, JSON and " + 413 "LibSVM data sources.") 414 .intConf 415 .createWithDefault(32) 416 417 val PARALLEL_PARTITION_DISCOVERY_PARALLELISM = 418 SQLConfigBuilder("spark.sql.sources.parallelPartitionDiscovery.parallelism") 419 .doc("The number of parallelism to list a collection of path recursively, Set the " + 420 "number to prevent file listing from generating too many tasks.") 421 .internal() 422 .intConf 423 .createWithDefault(10000) 424 425 // Whether to automatically resolve ambiguity in join conditions for self-joins. 426 // See SPARK-6231. 427 val DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY = 428 SQLConfigBuilder("spark.sql.selfJoinAutoResolveAmbiguity") 429 .internal() 430 .booleanConf 431 .createWithDefault(true) 432 433 // Whether to retain group by columns or not in GroupedData.agg. 434 val DATAFRAME_RETAIN_GROUP_COLUMNS = SQLConfigBuilder("spark.sql.retainGroupColumns") 435 .internal() 436 .booleanConf 437 .createWithDefault(true) 438 439 val DATAFRAME_PIVOT_MAX_VALUES = SQLConfigBuilder("spark.sql.pivotMaxValues") 440 .doc("When doing a pivot without specifying values for the pivot column this is the maximum " + 441 "number of (distinct) values that will be collected without error.") 442 .intConf 443 .createWithDefault(10000) 444 445 val RUN_SQL_ON_FILES = SQLConfigBuilder("spark.sql.runSQLOnFiles") 446 .internal() 447 .doc("When true, we could use `datasource`.`path` as table in SQL query.") 448 .booleanConf 449 .createWithDefault(true) 450 451 val WHOLESTAGE_CODEGEN_ENABLED = SQLConfigBuilder("spark.sql.codegen.wholeStage") 452 .internal() 453 .doc("When true, the whole stage (of multiple operators) will be compiled into single java" + 454 " method.") 455 .booleanConf 456 .createWithDefault(true) 457 458 val WHOLESTAGE_MAX_NUM_FIELDS = SQLConfigBuilder("spark.sql.codegen.maxFields") 459 .internal() 460 .doc("The maximum number of fields (including nested fields) that will be supported before" + 461 " deactivating whole-stage codegen.") 462 .intConf 463 .createWithDefault(100) 464 465 val WHOLESTAGE_FALLBACK = SQLConfigBuilder("spark.sql.codegen.fallback") 466 .internal() 467 .doc("When true, whole stage codegen could be temporary disabled for the part of query that" + 468 " fail to compile generated code") 469 .booleanConf 470 .createWithDefault(true) 471 472 val MAX_CASES_BRANCHES = SQLConfigBuilder("spark.sql.codegen.maxCaseBranches") 473 .internal() 474 .doc("The maximum number of switches supported with codegen.") 475 .intConf 476 .createWithDefault(20) 477 478 val FILES_MAX_PARTITION_BYTES = SQLConfigBuilder("spark.sql.files.maxPartitionBytes") 479 .doc("The maximum number of bytes to pack into a single partition when reading files.") 480 .longConf 481 .createWithDefault(128 * 1024 * 1024) // parquet.block.size 482 483 val FILES_OPEN_COST_IN_BYTES = SQLConfigBuilder("spark.sql.files.openCostInBytes") 484 .internal() 485 .doc("The estimated cost to open a file, measured by the number of bytes could be scanned in" + 486 " the same time. This is used when putting multiple files into a partition. It's better to" + 487 " over estimated, then the partitions with small files will be faster than partitions with" + 488 " bigger files (which is scheduled first).") 489 .longConf 490 .createWithDefault(4 * 1024 * 1024) 491 492 val EXCHANGE_REUSE_ENABLED = SQLConfigBuilder("spark.sql.exchange.reuse") 493 .internal() 494 .doc("When true, the planner will try to find out duplicated exchanges and re-use them.") 495 .booleanConf 496 .createWithDefault(true) 497 498 val STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT = 499 SQLConfigBuilder("spark.sql.streaming.stateStore.minDeltasForSnapshot") 500 .internal() 501 .doc("Minimum number of state store delta files that needs to be generated before they " + 502 "consolidated into snapshots.") 503 .intConf 504 .createWithDefault(10) 505 506 val CHECKPOINT_LOCATION = SQLConfigBuilder("spark.sql.streaming.checkpointLocation") 507 .doc("The default location for storing checkpoint data for streaming queries.") 508 .stringConf 509 .createOptional 510 511 val MIN_BATCHES_TO_RETAIN = SQLConfigBuilder("spark.sql.streaming.minBatchesToRetain") 512 .internal() 513 .doc("The minimum number of batches that must be retained and made recoverable.") 514 .intConf 515 .createWithDefault(100) 516 517 val UNSUPPORTED_OPERATION_CHECK_ENABLED = 518 SQLConfigBuilder("spark.sql.streaming.unsupportedOperationCheck") 519 .internal() 520 .doc("When true, the logical plan for streaming query will be checked for unsupported" + 521 " operations.") 522 .booleanConf 523 .createWithDefault(true) 524 525 val VARIABLE_SUBSTITUTE_ENABLED = 526 SQLConfigBuilder("spark.sql.variable.substitute") 527 .doc("This enables substitution using syntax like ${var} ${system:var} and ${env:var}.") 528 .booleanConf 529 .createWithDefault(true) 530 531 val VARIABLE_SUBSTITUTE_DEPTH = 532 SQLConfigBuilder("spark.sql.variable.substitute.depth") 533 .internal() 534 .doc("Deprecated: The maximum replacements the substitution engine will do.") 535 .intConf 536 .createWithDefault(40) 537 538 val ENABLE_TWOLEVEL_AGG_MAP = 539 SQLConfigBuilder("spark.sql.codegen.aggregate.map.twolevel.enable") 540 .internal() 541 .doc("Enable two-level aggregate hash map. When enabled, records will first be " + 542 "inserted/looked-up at a 1st-level, small, fast map, and then fallback to a " + 543 "2nd-level, larger, slower map when 1st level is full or keys cannot be found. " + 544 "When disabled, records go directly to the 2nd level. Defaults to true.") 545 .booleanConf 546 .createWithDefault(true) 547 548 val STREAMING_FILE_COMMIT_PROTOCOL_CLASS = 549 SQLConfigBuilder("spark.sql.streaming.commitProtocolClass") 550 .internal() 551 .stringConf 552 .createWithDefault("org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol") 553 554 val FILE_SINK_LOG_DELETION = SQLConfigBuilder("spark.sql.streaming.fileSink.log.deletion") 555 .internal() 556 .doc("Whether to delete the expired log files in file stream sink.") 557 .booleanConf 558 .createWithDefault(true) 559 560 val FILE_SINK_LOG_COMPACT_INTERVAL = 561 SQLConfigBuilder("spark.sql.streaming.fileSink.log.compactInterval") 562 .internal() 563 .doc("Number of log files after which all the previous files " + 564 "are compacted into the next log file.") 565 .intConf 566 .createWithDefault(10) 567 568 val FILE_SINK_LOG_CLEANUP_DELAY = 569 SQLConfigBuilder("spark.sql.streaming.fileSink.log.cleanupDelay") 570 .internal() 571 .doc("How long that a file is guaranteed to be visible for all readers.") 572 .timeConf(TimeUnit.MILLISECONDS) 573 .createWithDefault(TimeUnit.MINUTES.toMillis(10)) // 10 minutes 574 575 val FILE_SOURCE_LOG_DELETION = SQLConfigBuilder("spark.sql.streaming.fileSource.log.deletion") 576 .internal() 577 .doc("Whether to delete the expired log files in file stream source.") 578 .booleanConf 579 .createWithDefault(true) 580 581 val FILE_SOURCE_LOG_COMPACT_INTERVAL = 582 SQLConfigBuilder("spark.sql.streaming.fileSource.log.compactInterval") 583 .internal() 584 .doc("Number of log files after which all the previous files " + 585 "are compacted into the next log file.") 586 .intConf 587 .createWithDefault(10) 588 589 val FILE_SOURCE_LOG_CLEANUP_DELAY = 590 SQLConfigBuilder("spark.sql.streaming.fileSource.log.cleanupDelay") 591 .internal() 592 .doc("How long in milliseconds a file is guaranteed to be visible for all readers.") 593 .timeConf(TimeUnit.MILLISECONDS) 594 .createWithDefault(TimeUnit.MINUTES.toMillis(10)) // 10 minutes 595 596 val STREAMING_SCHEMA_INFERENCE = 597 SQLConfigBuilder("spark.sql.streaming.schemaInference") 598 .internal() 599 .doc("Whether file-based streaming sources will infer its own schema") 600 .booleanConf 601 .createWithDefault(false) 602 603 val STREAMING_POLLING_DELAY = 604 SQLConfigBuilder("spark.sql.streaming.pollingDelay") 605 .internal() 606 .doc("How long to delay polling new data when no data is available") 607 .timeConf(TimeUnit.MILLISECONDS) 608 .createWithDefault(10L) 609 610 val STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL = 611 SQLConfigBuilder("spark.sql.streaming.noDataProgressEventInterval") 612 .internal() 613 .doc("How long to wait between two progress events when there is no data") 614 .timeConf(TimeUnit.MILLISECONDS) 615 .createWithDefault(10000L) 616 617 val STREAMING_METRICS_ENABLED = 618 SQLConfigBuilder("spark.sql.streaming.metricsEnabled") 619 .doc("Whether Dropwizard/Codahale metrics will be reported for active streaming queries.") 620 .booleanConf 621 .createWithDefault(false) 622 623 val STREAMING_PROGRESS_RETENTION = 624 SQLConfigBuilder("spark.sql.streaming.numRecentProgressUpdates") 625 .doc("The number of progress updates to retain for a streaming query") 626 .intConf 627 .createWithDefault(100) 628 629 val NDV_MAX_ERROR = 630 SQLConfigBuilder("spark.sql.statistics.ndv.maxError") 631 .internal() 632 .doc("The maximum estimation error allowed in HyperLogLog++ algorithm when generating " + 633 "column level statistics.") 634 .doubleConf 635 .createWithDefault(0.05) 636 637 val IGNORE_CORRUPT_FILES = SQLConfigBuilder("spark.sql.files.ignoreCorruptFiles") 638 .doc("Whether to ignore corrupt files. If true, the Spark jobs will continue to run when " + 639 "encountering corrupted or non-existing and contents that have been read will still be " + 640 "returned.") 641 .booleanConf 642 .createWithDefault(false) 643 644 object Deprecated { 645 val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" 646 } 647} 648 649/** 650 * A class that enables the setting and getting of mutable config parameters/hints. 651 * 652 * In the presence of a SQLContext, these can be set and queried by passing SET commands 653 * into Spark SQL's query functions (i.e. sql()). Otherwise, users of this class can 654 * modify the hints by programmatically calling the setters and getters of this class. 655 * 656 * SQLConf is thread-safe (internally synchronized, so safe to be used in multiple threads). 657 */ 658class SQLConf extends Serializable with Logging { 659 import SQLConf._ 660 661 /** Only low degree of contention is expected for conf, thus NOT using ConcurrentHashMap. */ 662 @transient protected[spark] val settings = java.util.Collections.synchronizedMap( 663 new java.util.HashMap[String, String]()) 664 665 @transient private val reader = new ConfigReader(settings) 666 667 /** ************************ Spark SQL Params/Hints ******************* */ 668 669 def optimizerMaxIterations: Int = getConf(OPTIMIZER_MAX_ITERATIONS) 670 671 def optimizerInSetConversionThreshold: Int = getConf(OPTIMIZER_INSET_CONVERSION_THRESHOLD) 672 673 def stateStoreMinDeltasForSnapshot: Int = getConf(STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT) 674 675 def checkpointLocation: Option[String] = getConf(CHECKPOINT_LOCATION) 676 677 def isUnsupportedOperationCheckEnabled: Boolean = getConf(UNSUPPORTED_OPERATION_CHECK_ENABLED) 678 679 def streamingFileCommitProtocolClass: String = getConf(STREAMING_FILE_COMMIT_PROTOCOL_CLASS) 680 681 def fileSinkLogDeletion: Boolean = getConf(FILE_SINK_LOG_DELETION) 682 683 def fileSinkLogCompactInterval: Int = getConf(FILE_SINK_LOG_COMPACT_INTERVAL) 684 685 def fileSinkLogCleanupDelay: Long = getConf(FILE_SINK_LOG_CLEANUP_DELAY) 686 687 def fileSourceLogDeletion: Boolean = getConf(FILE_SOURCE_LOG_DELETION) 688 689 def fileSourceLogCompactInterval: Int = getConf(FILE_SOURCE_LOG_COMPACT_INTERVAL) 690 691 def fileSourceLogCleanupDelay: Long = getConf(FILE_SOURCE_LOG_CLEANUP_DELAY) 692 693 def streamingSchemaInference: Boolean = getConf(STREAMING_SCHEMA_INFERENCE) 694 695 def streamingPollingDelay: Long = getConf(STREAMING_POLLING_DELAY) 696 697 def streamingNoDataProgressEventInterval: Long = 698 getConf(STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL) 699 700 def streamingMetricsEnabled: Boolean = getConf(STREAMING_METRICS_ENABLED) 701 702 def streamingProgressRetention: Int = getConf(STREAMING_PROGRESS_RETENTION) 703 704 def filesMaxPartitionBytes: Long = getConf(FILES_MAX_PARTITION_BYTES) 705 706 def filesOpenCostInBytes: Long = getConf(FILES_OPEN_COST_IN_BYTES) 707 708 def useCompression: Boolean = getConf(COMPRESS_CACHED) 709 710 def parquetCompressionCodec: String = getConf(PARQUET_COMPRESSION) 711 712 def parquetCacheMetadata: Boolean = getConf(PARQUET_CACHE_METADATA) 713 714 def parquetVectorizedReaderEnabled: Boolean = getConf(PARQUET_VECTORIZED_READER_ENABLED) 715 716 def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE) 717 718 def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS) 719 720 def targetPostShuffleInputSize: Long = 721 getConf(SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE) 722 723 def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED) 724 725 def minNumPostShufflePartitions: Int = 726 getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS) 727 728 def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN) 729 730 def parquetFilterPushDown: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_ENABLED) 731 732 def orcFilterPushDown: Boolean = getConf(ORC_FILTER_PUSHDOWN_ENABLED) 733 734 def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITION_PATH) 735 736 def metastorePartitionPruning: Boolean = getConf(HIVE_METASTORE_PARTITION_PRUNING) 737 738 def manageFilesourcePartitions: Boolean = getConf(HIVE_MANAGE_FILESOURCE_PARTITIONS) 739 740 def filesourcePartitionFileCacheSize: Long = getConf(HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE) 741 742 def caseSensitiveInferenceMode: HiveCaseSensitiveInferenceMode.Value = 743 HiveCaseSensitiveInferenceMode.withName(getConf(HIVE_CASE_SENSITIVE_INFERENCE)) 744 745 def gatherFastStats: Boolean = getConf(GATHER_FASTSTAT) 746 747 def optimizerMetadataOnly: Boolean = getConf(OPTIMIZER_METADATA_ONLY) 748 749 def wholeStageEnabled: Boolean = getConf(WHOLESTAGE_CODEGEN_ENABLED) 750 751 def wholeStageMaxNumFields: Int = getConf(WHOLESTAGE_MAX_NUM_FIELDS) 752 753 def wholeStageFallback: Boolean = getConf(WHOLESTAGE_FALLBACK) 754 755 def maxCaseBranchesForCodegen: Int = getConf(MAX_CASES_BRANCHES) 756 757 def exchangeReuseEnabled: Boolean = getConf(EXCHANGE_REUSE_ENABLED) 758 759 def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE) 760 761 /** 762 * Returns the [[Resolver]] for the current configuration, which can be used to determine if two 763 * identifiers are equal. 764 */ 765 def resolver: Resolver = { 766 if (caseSensitiveAnalysis) { 767 org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution 768 } else { 769 org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution 770 } 771 } 772 773 def subexpressionEliminationEnabled: Boolean = 774 getConf(SUBEXPRESSION_ELIMINATION_ENABLED) 775 776 def autoBroadcastJoinThreshold: Long = getConf(AUTO_BROADCASTJOIN_THRESHOLD) 777 778 def limitScaleUpFactor: Int = getConf(LIMIT_SCALE_UP_FACTOR) 779 780 def fallBackToHdfsForStatsEnabled: Boolean = getConf(ENABLE_FALL_BACK_TO_HDFS_FOR_STATS) 781 782 def preferSortMergeJoin: Boolean = getConf(PREFER_SORTMERGEJOIN) 783 784 def enableRadixSort: Boolean = getConf(RADIX_SORT_ENABLED) 785 786 def defaultSizeInBytes: Long = getConf(DEFAULT_SIZE_IN_BYTES) 787 788 def isParquetSchemaMergingEnabled: Boolean = getConf(PARQUET_SCHEMA_MERGING_ENABLED) 789 790 def isParquetSchemaRespectSummaries: Boolean = getConf(PARQUET_SCHEMA_RESPECT_SUMMARIES) 791 792 def parquetOutputCommitterClass: String = getConf(PARQUET_OUTPUT_COMMITTER_CLASS) 793 794 def isParquetBinaryAsString: Boolean = getConf(PARQUET_BINARY_AS_STRING) 795 796 def isParquetINT96AsTimestamp: Boolean = getConf(PARQUET_INT96_AS_TIMESTAMP) 797 798 def writeLegacyParquetFormat: Boolean = getConf(PARQUET_WRITE_LEGACY_FORMAT) 799 800 def inMemoryPartitionPruning: Boolean = getConf(IN_MEMORY_PARTITION_PRUNING) 801 802 def columnNameOfCorruptRecord: String = getConf(COLUMN_NAME_OF_CORRUPT_RECORD) 803 804 def broadcastTimeout: Int = getConf(BROADCAST_TIMEOUT) 805 806 def defaultDataSourceName: String = getConf(DEFAULT_DATA_SOURCE_NAME) 807 808 def convertCTAS: Boolean = getConf(CONVERT_CTAS) 809 810 def partitionColumnTypeInferenceEnabled: Boolean = 811 getConf(SQLConf.PARTITION_COLUMN_TYPE_INFERENCE) 812 813 def fileCommitProtocolClass: String = getConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS) 814 815 def parallelPartitionDiscoveryThreshold: Int = 816 getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD) 817 818 def parallelPartitionDiscoveryParallelism: Int = 819 getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_PARALLELISM) 820 821 def bucketingEnabled: Boolean = getConf(SQLConf.BUCKETING_ENABLED) 822 823 def dataFrameSelfJoinAutoResolveAmbiguity: Boolean = 824 getConf(DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY) 825 826 def dataFrameRetainGroupColumns: Boolean = getConf(DATAFRAME_RETAIN_GROUP_COLUMNS) 827 828 def dataFramePivotMaxValues: Int = getConf(DATAFRAME_PIVOT_MAX_VALUES) 829 830 def runSQLonFile: Boolean = getConf(RUN_SQL_ON_FILES) 831 832 def enableTwoLevelAggMap: Boolean = getConf(ENABLE_TWOLEVEL_AGG_MAP) 833 834 def variableSubstituteEnabled: Boolean = getConf(VARIABLE_SUBSTITUTE_ENABLED) 835 836 def variableSubstituteDepth: Int = getConf(VARIABLE_SUBSTITUTE_DEPTH) 837 838 def warehousePath: String = new Path(getConf(StaticSQLConf.WAREHOUSE_PATH)).toString 839 840 def ignoreCorruptFiles: Boolean = getConf(IGNORE_CORRUPT_FILES) 841 842 def orderByOrdinal: Boolean = getConf(ORDER_BY_ORDINAL) 843 844 def groupByOrdinal: Boolean = getConf(GROUP_BY_ORDINAL) 845 846 def crossJoinEnabled: Boolean = getConf(SQLConf.CROSS_JOINS_ENABLED) 847 848 def ndvMaxError: Double = getConf(NDV_MAX_ERROR) 849 /** ********************** SQLConf functionality methods ************ */ 850 851 /** Set Spark SQL configuration properties. */ 852 def setConf(props: Properties): Unit = settings.synchronized { 853 props.asScala.foreach { case (k, v) => setConfString(k, v) } 854 } 855 856 /** Set the given Spark SQL configuration property using a `string` value. */ 857 def setConfString(key: String, value: String): Unit = { 858 require(key != null, "key cannot be null") 859 require(value != null, s"value cannot be null for key: $key") 860 val entry = sqlConfEntries.get(key) 861 if (entry != null) { 862 // Only verify configs in the SQLConf object 863 entry.valueConverter(value) 864 } 865 setConfWithCheck(key, value) 866 } 867 868 /** Set the given Spark SQL configuration property. */ 869 def setConf[T](entry: ConfigEntry[T], value: T): Unit = { 870 require(entry != null, "entry cannot be null") 871 require(value != null, s"value cannot be null for key: ${entry.key}") 872 require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered") 873 setConfWithCheck(entry.key, entry.stringConverter(value)) 874 } 875 876 /** Return the value of Spark SQL configuration property for the given key. */ 877 @throws[NoSuchElementException]("if key is not set") 878 def getConfString(key: String): String = { 879 Option(settings.get(key)). 880 orElse { 881 // Try to use the default value 882 Option(sqlConfEntries.get(key)).map(_.defaultValueString) 883 }. 884 getOrElse(throw new NoSuchElementException(key)) 885 } 886 887 /** 888 * Return the value of Spark SQL configuration property for the given key. If the key is not set 889 * yet, return `defaultValue`. This is useful when `defaultValue` in ConfigEntry is not the 890 * desired one. 891 */ 892 def getConf[T](entry: ConfigEntry[T], defaultValue: T): T = { 893 require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered") 894 Option(settings.get(entry.key)).map(entry.valueConverter).getOrElse(defaultValue) 895 } 896 897 /** 898 * Return the value of Spark SQL configuration property for the given key. If the key is not set 899 * yet, return `defaultValue` in [[ConfigEntry]]. 900 */ 901 def getConf[T](entry: ConfigEntry[T]): T = { 902 require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered") 903 entry.readFrom(reader) 904 } 905 906 /** 907 * Return the value of an optional Spark SQL configuration property for the given key. If the key 908 * is not set yet, returns None. 909 */ 910 def getConf[T](entry: OptionalConfigEntry[T]): Option[T] = { 911 require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered") 912 entry.readFrom(reader) 913 } 914 915 /** 916 * Return the `string` value of Spark SQL configuration property for the given key. If the key is 917 * not set yet, return `defaultValue`. 918 */ 919 def getConfString(key: String, defaultValue: String): String = { 920 val entry = sqlConfEntries.get(key) 921 if (entry != null && defaultValue != "<undefined>") { 922 // Only verify configs in the SQLConf object 923 entry.valueConverter(defaultValue) 924 } 925 Option(settings.get(key)).getOrElse(defaultValue) 926 } 927 928 /** 929 * Return all the configuration properties that have been set (i.e. not the default). 930 * This creates a new copy of the config properties in the form of a Map. 931 */ 932 def getAllConfs: immutable.Map[String, String] = 933 settings.synchronized { settings.asScala.toMap } 934 935 /** 936 * Return all the configuration definitions that have been defined in [[SQLConf]]. Each 937 * definition contains key, defaultValue and doc. 938 */ 939 def getAllDefinedConfs: Seq[(String, String, String)] = sqlConfEntries.synchronized { 940 sqlConfEntries.values.asScala.filter(_.isPublic).map { entry => 941 (entry.key, getConfString(entry.key, entry.defaultValueString), entry.doc) 942 }.toSeq 943 } 944 945 /** 946 * Return whether a given key is set in this [[SQLConf]]. 947 */ 948 def contains(key: String): Boolean = { 949 settings.containsKey(key) 950 } 951 952 private def setConfWithCheck(key: String, value: String): Unit = { 953 settings.put(key, value) 954 } 955 956 def unsetConf(key: String): Unit = { 957 settings.remove(key) 958 } 959 960 def unsetConf(entry: ConfigEntry[_]): Unit = { 961 settings.remove(entry.key) 962 } 963 964 def clear(): Unit = { 965 settings.clear() 966 } 967} 968