1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark 19 20import java.util.concurrent.{Executors, TimeUnit} 21 22import scala.collection.JavaConverters._ 23import scala.concurrent.duration._ 24import scala.language.postfixOps 25import scala.util.{Random, Try} 26 27import com.esotericsoftware.kryo.Kryo 28 29import org.apache.spark.internal.config._ 30import org.apache.spark.network.util.ByteUnit 31import org.apache.spark.serializer.{JavaSerializer, KryoRegistrator, KryoSerializer} 32import org.apache.spark.util.{ResetSystemProperties, RpcUtils} 33 34class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSystemProperties { 35 test("Test byteString conversion") { 36 val conf = new SparkConf() 37 // Simply exercise the API, we don't need a complete conversion test since that's handled in 38 // UtilsSuite.scala 39 assert(conf.getSizeAsBytes("fake", "1k") === ByteUnit.KiB.toBytes(1)) 40 assert(conf.getSizeAsKb("fake", "1k") === ByteUnit.KiB.toKiB(1)) 41 assert(conf.getSizeAsMb("fake", "1k") === ByteUnit.KiB.toMiB(1)) 42 assert(conf.getSizeAsGb("fake", "1k") === ByteUnit.KiB.toGiB(1)) 43 } 44 45 test("Test timeString conversion") { 46 val conf = new SparkConf() 47 // Simply exercise the API, we don't need a complete conversion test since that's handled in 48 // UtilsSuite.scala 49 assert(conf.getTimeAsMs("fake", "1ms") === TimeUnit.MILLISECONDS.toMillis(1)) 50 assert(conf.getTimeAsSeconds("fake", "1000ms") === TimeUnit.MILLISECONDS.toSeconds(1000)) 51 } 52 53 test("loading from system properties") { 54 System.setProperty("spark.test.testProperty", "2") 55 System.setProperty("nonspark.test.testProperty", "0") 56 val conf = new SparkConf() 57 assert(conf.get("spark.test.testProperty") === "2") 58 assert(!conf.contains("nonspark.test.testProperty")) 59 } 60 61 test("initializing without loading defaults") { 62 System.setProperty("spark.test.testProperty", "2") 63 val conf = new SparkConf(false) 64 assert(!conf.contains("spark.test.testProperty")) 65 } 66 67 test("named set methods") { 68 val conf = new SparkConf(false) 69 70 conf.setMaster("local[3]") 71 conf.setAppName("My app") 72 conf.setSparkHome("/path") 73 conf.setJars(Seq("a.jar", "b.jar")) 74 conf.setExecutorEnv("VAR1", "value1") 75 conf.setExecutorEnv(Seq(("VAR2", "value2"), ("VAR3", "value3"))) 76 77 assert(conf.get("spark.master") === "local[3]") 78 assert(conf.get("spark.app.name") === "My app") 79 assert(conf.get("spark.home") === "/path") 80 assert(conf.get("spark.jars") === "a.jar,b.jar") 81 assert(conf.get("spark.executorEnv.VAR1") === "value1") 82 assert(conf.get("spark.executorEnv.VAR2") === "value2") 83 assert(conf.get("spark.executorEnv.VAR3") === "value3") 84 85 // Test the Java-friendly versions of these too 86 conf.setJars(Array("c.jar", "d.jar")) 87 conf.setExecutorEnv(Array(("VAR4", "value4"), ("VAR5", "value5"))) 88 assert(conf.get("spark.jars") === "c.jar,d.jar") 89 assert(conf.get("spark.executorEnv.VAR4") === "value4") 90 assert(conf.get("spark.executorEnv.VAR5") === "value5") 91 } 92 93 test("basic get and set") { 94 val conf = new SparkConf(false) 95 assert(conf.getAll.toSet === Set()) 96 conf.set("k1", "v1") 97 conf.setAll(Seq(("k2", "v2"), ("k3", "v3"))) 98 assert(conf.getAll.toSet === Set(("k1", "v1"), ("k2", "v2"), ("k3", "v3"))) 99 conf.set("k1", "v4") 100 conf.setAll(Seq(("k2", "v5"), ("k3", "v6"))) 101 assert(conf.getAll.toSet === Set(("k1", "v4"), ("k2", "v5"), ("k3", "v6"))) 102 assert(conf.contains("k1"), "conf did not contain k1") 103 assert(!conf.contains("k4"), "conf contained k4") 104 assert(conf.get("k1") === "v4") 105 intercept[Exception] { conf.get("k4") } 106 assert(conf.get("k4", "not found") === "not found") 107 assert(conf.getOption("k1") === Some("v4")) 108 assert(conf.getOption("k4") === None) 109 } 110 111 test("creating SparkContext without master and app name") { 112 val conf = new SparkConf(false) 113 intercept[SparkException] { sc = new SparkContext(conf) } 114 } 115 116 test("creating SparkContext without master") { 117 val conf = new SparkConf(false).setAppName("My app") 118 intercept[SparkException] { sc = new SparkContext(conf) } 119 } 120 121 test("creating SparkContext without app name") { 122 val conf = new SparkConf(false).setMaster("local") 123 intercept[SparkException] { sc = new SparkContext(conf) } 124 } 125 126 test("creating SparkContext with both master and app name") { 127 val conf = new SparkConf(false).setMaster("local").setAppName("My app") 128 sc = new SparkContext(conf) 129 assert(sc.master === "local") 130 assert(sc.appName === "My app") 131 } 132 133 test("SparkContext property overriding") { 134 val conf = new SparkConf(false).setMaster("local").setAppName("My app") 135 sc = new SparkContext("local[2]", "My other app", conf) 136 assert(sc.master === "local[2]") 137 assert(sc.appName === "My other app") 138 } 139 140 test("nested property names") { 141 // This wasn't supported by some external conf parsing libraries 142 System.setProperty("spark.test.a", "a") 143 System.setProperty("spark.test.a.b", "a.b") 144 System.setProperty("spark.test.a.b.c", "a.b.c") 145 val conf = new SparkConf() 146 assert(conf.get("spark.test.a") === "a") 147 assert(conf.get("spark.test.a.b") === "a.b") 148 assert(conf.get("spark.test.a.b.c") === "a.b.c") 149 conf.set("spark.test.a.b", "A.B") 150 assert(conf.get("spark.test.a") === "a") 151 assert(conf.get("spark.test.a.b") === "A.B") 152 assert(conf.get("spark.test.a.b.c") === "a.b.c") 153 } 154 155 test("Thread safeness - SPARK-5425") { 156 val executor = Executors.newSingleThreadScheduledExecutor() 157 val sf = executor.scheduleAtFixedRate(new Runnable { 158 override def run(): Unit = 159 System.setProperty("spark.5425." + Random.nextInt(), Random.nextInt().toString) 160 }, 0, 1, TimeUnit.MILLISECONDS) 161 162 try { 163 val t0 = System.currentTimeMillis() 164 while ((System.currentTimeMillis() - t0) < 1000) { 165 val conf = Try(new SparkConf(loadDefaults = true)) 166 assert(conf.isSuccess === true) 167 } 168 } finally { 169 executor.shutdownNow() 170 val sysProps = System.getProperties 171 for (key <- sysProps.stringPropertyNames().asScala if key.startsWith("spark.5425.")) 172 sysProps.remove(key) 173 } 174 } 175 176 test("register kryo classes through registerKryoClasses") { 177 val conf = new SparkConf().set("spark.kryo.registrationRequired", "true") 178 179 conf.registerKryoClasses(Array(classOf[Class1], classOf[Class2])) 180 assert(conf.get("spark.kryo.classesToRegister") === 181 classOf[Class1].getName + "," + classOf[Class2].getName) 182 183 conf.registerKryoClasses(Array(classOf[Class3])) 184 assert(conf.get("spark.kryo.classesToRegister") === 185 classOf[Class1].getName + "," + classOf[Class2].getName + "," + classOf[Class3].getName) 186 187 conf.registerKryoClasses(Array(classOf[Class2])) 188 assert(conf.get("spark.kryo.classesToRegister") === 189 classOf[Class1].getName + "," + classOf[Class2].getName + "," + classOf[Class3].getName) 190 191 // Kryo doesn't expose a way to discover registered classes, but at least make sure this doesn't 192 // blow up. 193 val serializer = new KryoSerializer(conf) 194 serializer.newInstance().serialize(new Class1()) 195 serializer.newInstance().serialize(new Class2()) 196 serializer.newInstance().serialize(new Class3()) 197 } 198 199 test("register kryo classes through registerKryoClasses and custom registrator") { 200 val conf = new SparkConf().set("spark.kryo.registrationRequired", "true") 201 202 conf.registerKryoClasses(Array(classOf[Class1])) 203 assert(conf.get("spark.kryo.classesToRegister") === classOf[Class1].getName) 204 205 conf.set("spark.kryo.registrator", classOf[CustomRegistrator].getName) 206 207 // Kryo doesn't expose a way to discover registered classes, but at least make sure this doesn't 208 // blow up. 209 val serializer = new KryoSerializer(conf) 210 serializer.newInstance().serialize(new Class1()) 211 serializer.newInstance().serialize(new Class2()) 212 } 213 214 test("register kryo classes through conf") { 215 val conf = new SparkConf().set("spark.kryo.registrationRequired", "true") 216 conf.set("spark.kryo.classesToRegister", "java.lang.StringBuffer") 217 conf.set("spark.serializer", classOf[KryoSerializer].getName) 218 219 // Kryo doesn't expose a way to discover registered classes, but at least make sure this doesn't 220 // blow up. 221 val serializer = new KryoSerializer(conf) 222 serializer.newInstance().serialize(new StringBuffer()) 223 } 224 225 test("deprecated configs") { 226 val conf = new SparkConf() 227 val newName = "spark.history.fs.update.interval" 228 229 assert(!conf.contains(newName)) 230 231 conf.set("spark.history.updateInterval", "1") 232 assert(conf.get(newName) === "1") 233 234 conf.set("spark.history.fs.updateInterval", "2") 235 assert(conf.get(newName) === "2") 236 237 conf.set("spark.history.fs.update.interval.seconds", "3") 238 assert(conf.get(newName) === "3") 239 240 conf.set(newName, "4") 241 assert(conf.get(newName) === "4") 242 243 val count = conf.getAll.count { case (k, v) => k.startsWith("spark.history.") } 244 assert(count === 4) 245 246 conf.set("spark.yarn.applicationMaster.waitTries", "42") 247 assert(conf.getTimeAsSeconds("spark.yarn.am.waitTime") === 420) 248 249 conf.set("spark.kryoserializer.buffer.mb", "1.1") 250 assert(conf.getSizeAsKb("spark.kryoserializer.buffer") === 1100) 251 } 252 253 test("akka deprecated configs") { 254 val conf = new SparkConf() 255 256 assert(!conf.contains("spark.rpc.numRetries")) 257 assert(!conf.contains("spark.rpc.retry.wait")) 258 assert(!conf.contains("spark.rpc.askTimeout")) 259 assert(!conf.contains("spark.rpc.lookupTimeout")) 260 261 conf.set("spark.akka.num.retries", "1") 262 assert(RpcUtils.numRetries(conf) === 1) 263 264 conf.set("spark.akka.retry.wait", "2") 265 assert(RpcUtils.retryWaitMs(conf) === 2L) 266 267 conf.set("spark.akka.askTimeout", "3") 268 assert(RpcUtils.askRpcTimeout(conf).duration === (3 seconds)) 269 270 conf.set("spark.akka.lookupTimeout", "4") 271 assert(RpcUtils.lookupRpcTimeout(conf).duration === (4 seconds)) 272 } 273 274 test("SPARK-13727") { 275 val conf = new SparkConf() 276 // set the conf in the deprecated way 277 conf.set("spark.io.compression.lz4.block.size", "12345") 278 // get the conf in the recommended way 279 assert(conf.get("spark.io.compression.lz4.blockSize") === "12345") 280 // we can still get the conf in the deprecated way 281 assert(conf.get("spark.io.compression.lz4.block.size") === "12345") 282 // the contains() also works as expected 283 assert(conf.contains("spark.io.compression.lz4.block.size")) 284 assert(conf.contains("spark.io.compression.lz4.blockSize")) 285 assert(conf.contains("spark.io.unknown") === false) 286 } 287 288 val serializers = Map( 289 "java" -> new JavaSerializer(new SparkConf()), 290 "kryo" -> new KryoSerializer(new SparkConf())) 291 292 serializers.foreach { case (name, ser) => 293 test(s"SPARK-17240: SparkConf should be serializable ($name)") { 294 val conf = new SparkConf() 295 conf.set(DRIVER_CLASS_PATH, "${" + DRIVER_JAVA_OPTIONS.key + "}") 296 conf.set(DRIVER_JAVA_OPTIONS, "test") 297 298 val serializer = ser.newInstance() 299 val bytes = serializer.serialize(conf) 300 val deser = serializer.deserialize[SparkConf](bytes) 301 302 assert(conf.get(DRIVER_CLASS_PATH) === deser.get(DRIVER_CLASS_PATH)) 303 } 304 } 305 306} 307 308class Class1 {} 309class Class2 {} 310class Class3 {} 311 312class CustomRegistrator extends KryoRegistrator { 313 def registerClasses(kryo: Kryo) { 314 kryo.register(classOf[Class2]) 315 } 316} 317