/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.spark import java.util.concurrent.{Executors, TimeUnit} import scala.collection.JavaConverters._ import scala.concurrent.duration._ import scala.language.postfixOps import scala.util.{Random, Try} import com.esotericsoftware.kryo.Kryo import org.apache.spark.internal.config._ import org.apache.spark.network.util.ByteUnit import org.apache.spark.serializer.{JavaSerializer, KryoRegistrator, KryoSerializer} import org.apache.spark.util.{ResetSystemProperties, RpcUtils} class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSystemProperties { test("Test byteString conversion") { val conf = new SparkConf() // Simply exercise the API, we don't need a complete conversion test since that's handled in // UtilsSuite.scala assert(conf.getSizeAsBytes("fake", "1k") === ByteUnit.KiB.toBytes(1)) assert(conf.getSizeAsKb("fake", "1k") === ByteUnit.KiB.toKiB(1)) assert(conf.getSizeAsMb("fake", "1k") === ByteUnit.KiB.toMiB(1)) assert(conf.getSizeAsGb("fake", "1k") === ByteUnit.KiB.toGiB(1)) } test("Test timeString conversion") { val conf = new SparkConf() // Simply exercise the API, we don't need a complete conversion test since that's handled in // UtilsSuite.scala assert(conf.getTimeAsMs("fake", "1ms") === TimeUnit.MILLISECONDS.toMillis(1)) assert(conf.getTimeAsSeconds("fake", "1000ms") === TimeUnit.MILLISECONDS.toSeconds(1000)) } test("loading from system properties") { System.setProperty("spark.test.testProperty", "2") System.setProperty("nonspark.test.testProperty", "0") val conf = new SparkConf() assert(conf.get("spark.test.testProperty") === "2") assert(!conf.contains("nonspark.test.testProperty")) } test("initializing without loading defaults") { System.setProperty("spark.test.testProperty", "2") val conf = new SparkConf(false) assert(!conf.contains("spark.test.testProperty")) } test("named set methods") { val conf = new SparkConf(false) conf.setMaster("local[3]") conf.setAppName("My app") conf.setSparkHome("/path") conf.setJars(Seq("a.jar", "b.jar")) conf.setExecutorEnv("VAR1", "value1") conf.setExecutorEnv(Seq(("VAR2", "value2"), ("VAR3", "value3"))) assert(conf.get("spark.master") === "local[3]") assert(conf.get("spark.app.name") === "My app") assert(conf.get("spark.home") === "/path") assert(conf.get("spark.jars") === "a.jar,b.jar") assert(conf.get("spark.executorEnv.VAR1") === "value1") assert(conf.get("spark.executorEnv.VAR2") === "value2") assert(conf.get("spark.executorEnv.VAR3") === "value3") // Test the Java-friendly versions of these too conf.setJars(Array("c.jar", "d.jar")) conf.setExecutorEnv(Array(("VAR4", "value4"), ("VAR5", "value5"))) assert(conf.get("spark.jars") === "c.jar,d.jar") assert(conf.get("spark.executorEnv.VAR4") === "value4") assert(conf.get("spark.executorEnv.VAR5") === "value5") } test("basic get and set") { val conf = new SparkConf(false) assert(conf.getAll.toSet === Set()) conf.set("k1", "v1") conf.setAll(Seq(("k2", "v2"), ("k3", "v3"))) assert(conf.getAll.toSet === Set(("k1", "v1"), ("k2", "v2"), ("k3", "v3"))) conf.set("k1", "v4") conf.setAll(Seq(("k2", "v5"), ("k3", "v6"))) assert(conf.getAll.toSet === Set(("k1", "v4"), ("k2", "v5"), ("k3", "v6"))) assert(conf.contains("k1"), "conf did not contain k1") assert(!conf.contains("k4"), "conf contained k4") assert(conf.get("k1") === "v4") intercept[Exception] { conf.get("k4") } assert(conf.get("k4", "not found") === "not found") assert(conf.getOption("k1") === Some("v4")) assert(conf.getOption("k4") === None) } test("creating SparkContext without master and app name") { val conf = new SparkConf(false) intercept[SparkException] { sc = new SparkContext(conf) } } test("creating SparkContext without master") { val conf = new SparkConf(false).setAppName("My app") intercept[SparkException] { sc = new SparkContext(conf) } } test("creating SparkContext without app name") { val conf = new SparkConf(false).setMaster("local") intercept[SparkException] { sc = new SparkContext(conf) } } test("creating SparkContext with both master and app name") { val conf = new SparkConf(false).setMaster("local").setAppName("My app") sc = new SparkContext(conf) assert(sc.master === "local") assert(sc.appName === "My app") } test("SparkContext property overriding") { val conf = new SparkConf(false).setMaster("local").setAppName("My app") sc = new SparkContext("local[2]", "My other app", conf) assert(sc.master === "local[2]") assert(sc.appName === "My other app") } test("nested property names") { // This wasn't supported by some external conf parsing libraries System.setProperty("spark.test.a", "a") System.setProperty("spark.test.a.b", "a.b") System.setProperty("spark.test.a.b.c", "a.b.c") val conf = new SparkConf() assert(conf.get("spark.test.a") === "a") assert(conf.get("spark.test.a.b") === "a.b") assert(conf.get("spark.test.a.b.c") === "a.b.c") conf.set("spark.test.a.b", "A.B") assert(conf.get("spark.test.a") === "a") assert(conf.get("spark.test.a.b") === "A.B") assert(conf.get("spark.test.a.b.c") === "a.b.c") } test("Thread safeness - SPARK-5425") { val executor = Executors.newSingleThreadScheduledExecutor() val sf = executor.scheduleAtFixedRate(new Runnable { override def run(): Unit = System.setProperty("spark.5425." + Random.nextInt(), Random.nextInt().toString) }, 0, 1, TimeUnit.MILLISECONDS) try { val t0 = System.currentTimeMillis() while ((System.currentTimeMillis() - t0) < 1000) { val conf = Try(new SparkConf(loadDefaults = true)) assert(conf.isSuccess === true) } } finally { executor.shutdownNow() val sysProps = System.getProperties for (key <- sysProps.stringPropertyNames().asScala if key.startsWith("spark.5425.")) sysProps.remove(key) } } test("register kryo classes through registerKryoClasses") { val conf = new SparkConf().set("spark.kryo.registrationRequired", "true") conf.registerKryoClasses(Array(classOf[Class1], classOf[Class2])) assert(conf.get("spark.kryo.classesToRegister") === classOf[Class1].getName + "," + classOf[Class2].getName) conf.registerKryoClasses(Array(classOf[Class3])) assert(conf.get("spark.kryo.classesToRegister") === classOf[Class1].getName + "," + classOf[Class2].getName + "," + classOf[Class3].getName) conf.registerKryoClasses(Array(classOf[Class2])) assert(conf.get("spark.kryo.classesToRegister") === classOf[Class1].getName + "," + classOf[Class2].getName + "," + classOf[Class3].getName) // Kryo doesn't expose a way to discover registered classes, but at least make sure this doesn't // blow up. val serializer = new KryoSerializer(conf) serializer.newInstance().serialize(new Class1()) serializer.newInstance().serialize(new Class2()) serializer.newInstance().serialize(new Class3()) } test("register kryo classes through registerKryoClasses and custom registrator") { val conf = new SparkConf().set("spark.kryo.registrationRequired", "true") conf.registerKryoClasses(Array(classOf[Class1])) assert(conf.get("spark.kryo.classesToRegister") === classOf[Class1].getName) conf.set("spark.kryo.registrator", classOf[CustomRegistrator].getName) // Kryo doesn't expose a way to discover registered classes, but at least make sure this doesn't // blow up. val serializer = new KryoSerializer(conf) serializer.newInstance().serialize(new Class1()) serializer.newInstance().serialize(new Class2()) } test("register kryo classes through conf") { val conf = new SparkConf().set("spark.kryo.registrationRequired", "true") conf.set("spark.kryo.classesToRegister", "java.lang.StringBuffer") conf.set("spark.serializer", classOf[KryoSerializer].getName) // Kryo doesn't expose a way to discover registered classes, but at least make sure this doesn't // blow up. val serializer = new KryoSerializer(conf) serializer.newInstance().serialize(new StringBuffer()) } test("deprecated configs") { val conf = new SparkConf() val newName = "spark.history.fs.update.interval" assert(!conf.contains(newName)) conf.set("spark.history.updateInterval", "1") assert(conf.get(newName) === "1") conf.set("spark.history.fs.updateInterval", "2") assert(conf.get(newName) === "2") conf.set("spark.history.fs.update.interval.seconds", "3") assert(conf.get(newName) === "3") conf.set(newName, "4") assert(conf.get(newName) === "4") val count = conf.getAll.count { case (k, v) => k.startsWith("spark.history.") } assert(count === 4) conf.set("spark.yarn.applicationMaster.waitTries", "42") assert(conf.getTimeAsSeconds("spark.yarn.am.waitTime") === 420) conf.set("spark.kryoserializer.buffer.mb", "1.1") assert(conf.getSizeAsKb("spark.kryoserializer.buffer") === 1100) } test("akka deprecated configs") { val conf = new SparkConf() assert(!conf.contains("spark.rpc.numRetries")) assert(!conf.contains("spark.rpc.retry.wait")) assert(!conf.contains("spark.rpc.askTimeout")) assert(!conf.contains("spark.rpc.lookupTimeout")) conf.set("spark.akka.num.retries", "1") assert(RpcUtils.numRetries(conf) === 1) conf.set("spark.akka.retry.wait", "2") assert(RpcUtils.retryWaitMs(conf) === 2L) conf.set("spark.akka.askTimeout", "3") assert(RpcUtils.askRpcTimeout(conf).duration === (3 seconds)) conf.set("spark.akka.lookupTimeout", "4") assert(RpcUtils.lookupRpcTimeout(conf).duration === (4 seconds)) } test("SPARK-13727") { val conf = new SparkConf() // set the conf in the deprecated way conf.set("spark.io.compression.lz4.block.size", "12345") // get the conf in the recommended way assert(conf.get("spark.io.compression.lz4.blockSize") === "12345") // we can still get the conf in the deprecated way assert(conf.get("spark.io.compression.lz4.block.size") === "12345") // the contains() also works as expected assert(conf.contains("spark.io.compression.lz4.block.size")) assert(conf.contains("spark.io.compression.lz4.blockSize")) assert(conf.contains("spark.io.unknown") === false) } val serializers = Map( "java" -> new JavaSerializer(new SparkConf()), "kryo" -> new KryoSerializer(new SparkConf())) serializers.foreach { case (name, ser) => test(s"SPARK-17240: SparkConf should be serializable ($name)") { val conf = new SparkConf() conf.set(DRIVER_CLASS_PATH, "${" + DRIVER_JAVA_OPTIONS.key + "}") conf.set(DRIVER_JAVA_OPTIONS, "test") val serializer = ser.newInstance() val bytes = serializer.serialize(conf) val deser = serializer.deserialize[SparkConf](bytes) assert(conf.get(DRIVER_CLASS_PATH) === deser.get(DRIVER_CLASS_PATH)) } } } class Class1 {} class Class2 {} class Class3 {} class CustomRegistrator extends KryoRegistrator { def registerClasses(kryo: Kryo) { kryo.register(classOf[Class2]) } }