1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark
19
20import java.util.concurrent.{Executors, TimeUnit}
21
22import scala.collection.JavaConverters._
23import scala.concurrent.duration._
24import scala.language.postfixOps
25import scala.util.{Random, Try}
26
27import com.esotericsoftware.kryo.Kryo
28
29import org.apache.spark.internal.config._
30import org.apache.spark.network.util.ByteUnit
31import org.apache.spark.serializer.{JavaSerializer, KryoRegistrator, KryoSerializer}
32import org.apache.spark.util.{ResetSystemProperties, RpcUtils}
33
34class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSystemProperties {
35  test("Test byteString conversion") {
36    val conf = new SparkConf()
37    // Simply exercise the API, we don't need a complete conversion test since that's handled in
38    // UtilsSuite.scala
39    assert(conf.getSizeAsBytes("fake", "1k") === ByteUnit.KiB.toBytes(1))
40    assert(conf.getSizeAsKb("fake", "1k") === ByteUnit.KiB.toKiB(1))
41    assert(conf.getSizeAsMb("fake", "1k") === ByteUnit.KiB.toMiB(1))
42    assert(conf.getSizeAsGb("fake", "1k") === ByteUnit.KiB.toGiB(1))
43  }
44
45  test("Test timeString conversion") {
46    val conf = new SparkConf()
47    // Simply exercise the API, we don't need a complete conversion test since that's handled in
48    // UtilsSuite.scala
49    assert(conf.getTimeAsMs("fake", "1ms") === TimeUnit.MILLISECONDS.toMillis(1))
50    assert(conf.getTimeAsSeconds("fake", "1000ms") === TimeUnit.MILLISECONDS.toSeconds(1000))
51  }
52
53  test("loading from system properties") {
54    System.setProperty("spark.test.testProperty", "2")
55    System.setProperty("nonspark.test.testProperty", "0")
56    val conf = new SparkConf()
57    assert(conf.get("spark.test.testProperty") === "2")
58    assert(!conf.contains("nonspark.test.testProperty"))
59  }
60
61  test("initializing without loading defaults") {
62    System.setProperty("spark.test.testProperty", "2")
63    val conf = new SparkConf(false)
64    assert(!conf.contains("spark.test.testProperty"))
65  }
66
67  test("named set methods") {
68    val conf = new SparkConf(false)
69
70    conf.setMaster("local[3]")
71    conf.setAppName("My app")
72    conf.setSparkHome("/path")
73    conf.setJars(Seq("a.jar", "b.jar"))
74    conf.setExecutorEnv("VAR1", "value1")
75    conf.setExecutorEnv(Seq(("VAR2", "value2"), ("VAR3", "value3")))
76
77    assert(conf.get("spark.master") === "local[3]")
78    assert(conf.get("spark.app.name") === "My app")
79    assert(conf.get("spark.home") === "/path")
80    assert(conf.get("spark.jars") === "a.jar,b.jar")
81    assert(conf.get("spark.executorEnv.VAR1") === "value1")
82    assert(conf.get("spark.executorEnv.VAR2") === "value2")
83    assert(conf.get("spark.executorEnv.VAR3") === "value3")
84
85    // Test the Java-friendly versions of these too
86    conf.setJars(Array("c.jar", "d.jar"))
87    conf.setExecutorEnv(Array(("VAR4", "value4"), ("VAR5", "value5")))
88    assert(conf.get("spark.jars") === "c.jar,d.jar")
89    assert(conf.get("spark.executorEnv.VAR4") === "value4")
90    assert(conf.get("spark.executorEnv.VAR5") === "value5")
91  }
92
93  test("basic get and set") {
94    val conf = new SparkConf(false)
95    assert(conf.getAll.toSet === Set())
96    conf.set("k1", "v1")
97    conf.setAll(Seq(("k2", "v2"), ("k3", "v3")))
98    assert(conf.getAll.toSet === Set(("k1", "v1"), ("k2", "v2"), ("k3", "v3")))
99    conf.set("k1", "v4")
100    conf.setAll(Seq(("k2", "v5"), ("k3", "v6")))
101    assert(conf.getAll.toSet === Set(("k1", "v4"), ("k2", "v5"), ("k3", "v6")))
102    assert(conf.contains("k1"), "conf did not contain k1")
103    assert(!conf.contains("k4"), "conf contained k4")
104    assert(conf.get("k1") === "v4")
105    intercept[Exception] { conf.get("k4") }
106    assert(conf.get("k4", "not found") === "not found")
107    assert(conf.getOption("k1") === Some("v4"))
108    assert(conf.getOption("k4") === None)
109  }
110
111  test("creating SparkContext without master and app name") {
112    val conf = new SparkConf(false)
113    intercept[SparkException] { sc = new SparkContext(conf) }
114  }
115
116  test("creating SparkContext without master") {
117    val conf = new SparkConf(false).setAppName("My app")
118    intercept[SparkException] { sc = new SparkContext(conf) }
119  }
120
121  test("creating SparkContext without app name") {
122    val conf = new SparkConf(false).setMaster("local")
123    intercept[SparkException] { sc = new SparkContext(conf) }
124  }
125
126  test("creating SparkContext with both master and app name") {
127    val conf = new SparkConf(false).setMaster("local").setAppName("My app")
128    sc = new SparkContext(conf)
129    assert(sc.master === "local")
130    assert(sc.appName === "My app")
131  }
132
133  test("SparkContext property overriding") {
134    val conf = new SparkConf(false).setMaster("local").setAppName("My app")
135    sc = new SparkContext("local[2]", "My other app", conf)
136    assert(sc.master === "local[2]")
137    assert(sc.appName === "My other app")
138  }
139
140  test("nested property names") {
141    // This wasn't supported by some external conf parsing libraries
142    System.setProperty("spark.test.a", "a")
143    System.setProperty("spark.test.a.b", "a.b")
144    System.setProperty("spark.test.a.b.c", "a.b.c")
145    val conf = new SparkConf()
146    assert(conf.get("spark.test.a") === "a")
147    assert(conf.get("spark.test.a.b") === "a.b")
148    assert(conf.get("spark.test.a.b.c") === "a.b.c")
149    conf.set("spark.test.a.b", "A.B")
150    assert(conf.get("spark.test.a") === "a")
151    assert(conf.get("spark.test.a.b") === "A.B")
152    assert(conf.get("spark.test.a.b.c") === "a.b.c")
153  }
154
155  test("Thread safeness - SPARK-5425") {
156    val executor = Executors.newSingleThreadScheduledExecutor()
157    val sf = executor.scheduleAtFixedRate(new Runnable {
158      override def run(): Unit =
159        System.setProperty("spark.5425." + Random.nextInt(), Random.nextInt().toString)
160    }, 0, 1, TimeUnit.MILLISECONDS)
161
162    try {
163      val t0 = System.currentTimeMillis()
164      while ((System.currentTimeMillis() - t0) < 1000) {
165        val conf = Try(new SparkConf(loadDefaults = true))
166        assert(conf.isSuccess === true)
167      }
168    } finally {
169      executor.shutdownNow()
170      val sysProps = System.getProperties
171      for (key <- sysProps.stringPropertyNames().asScala if key.startsWith("spark.5425."))
172        sysProps.remove(key)
173    }
174  }
175
176  test("register kryo classes through registerKryoClasses") {
177    val conf = new SparkConf().set("spark.kryo.registrationRequired", "true")
178
179    conf.registerKryoClasses(Array(classOf[Class1], classOf[Class2]))
180    assert(conf.get("spark.kryo.classesToRegister") ===
181      classOf[Class1].getName + "," + classOf[Class2].getName)
182
183    conf.registerKryoClasses(Array(classOf[Class3]))
184    assert(conf.get("spark.kryo.classesToRegister") ===
185      classOf[Class1].getName + "," + classOf[Class2].getName + "," + classOf[Class3].getName)
186
187    conf.registerKryoClasses(Array(classOf[Class2]))
188    assert(conf.get("spark.kryo.classesToRegister") ===
189      classOf[Class1].getName + "," + classOf[Class2].getName + "," + classOf[Class3].getName)
190
191    // Kryo doesn't expose a way to discover registered classes, but at least make sure this doesn't
192    // blow up.
193    val serializer = new KryoSerializer(conf)
194    serializer.newInstance().serialize(new Class1())
195    serializer.newInstance().serialize(new Class2())
196    serializer.newInstance().serialize(new Class3())
197  }
198
199  test("register kryo classes through registerKryoClasses and custom registrator") {
200    val conf = new SparkConf().set("spark.kryo.registrationRequired", "true")
201
202    conf.registerKryoClasses(Array(classOf[Class1]))
203    assert(conf.get("spark.kryo.classesToRegister") === classOf[Class1].getName)
204
205    conf.set("spark.kryo.registrator", classOf[CustomRegistrator].getName)
206
207    // Kryo doesn't expose a way to discover registered classes, but at least make sure this doesn't
208    // blow up.
209    val serializer = new KryoSerializer(conf)
210    serializer.newInstance().serialize(new Class1())
211    serializer.newInstance().serialize(new Class2())
212  }
213
214  test("register kryo classes through conf") {
215    val conf = new SparkConf().set("spark.kryo.registrationRequired", "true")
216    conf.set("spark.kryo.classesToRegister", "java.lang.StringBuffer")
217    conf.set("spark.serializer", classOf[KryoSerializer].getName)
218
219    // Kryo doesn't expose a way to discover registered classes, but at least make sure this doesn't
220    // blow up.
221    val serializer = new KryoSerializer(conf)
222    serializer.newInstance().serialize(new StringBuffer())
223  }
224
225  test("deprecated configs") {
226    val conf = new SparkConf()
227    val newName = "spark.history.fs.update.interval"
228
229    assert(!conf.contains(newName))
230
231    conf.set("spark.history.updateInterval", "1")
232    assert(conf.get(newName) === "1")
233
234    conf.set("spark.history.fs.updateInterval", "2")
235    assert(conf.get(newName) === "2")
236
237    conf.set("spark.history.fs.update.interval.seconds", "3")
238    assert(conf.get(newName) === "3")
239
240    conf.set(newName, "4")
241    assert(conf.get(newName) === "4")
242
243    val count = conf.getAll.count { case (k, v) => k.startsWith("spark.history.") }
244    assert(count === 4)
245
246    conf.set("spark.yarn.applicationMaster.waitTries", "42")
247    assert(conf.getTimeAsSeconds("spark.yarn.am.waitTime") === 420)
248
249    conf.set("spark.kryoserializer.buffer.mb", "1.1")
250    assert(conf.getSizeAsKb("spark.kryoserializer.buffer") === 1100)
251  }
252
253  test("akka deprecated configs") {
254    val conf = new SparkConf()
255
256    assert(!conf.contains("spark.rpc.numRetries"))
257    assert(!conf.contains("spark.rpc.retry.wait"))
258    assert(!conf.contains("spark.rpc.askTimeout"))
259    assert(!conf.contains("spark.rpc.lookupTimeout"))
260
261    conf.set("spark.akka.num.retries", "1")
262    assert(RpcUtils.numRetries(conf) === 1)
263
264    conf.set("spark.akka.retry.wait", "2")
265    assert(RpcUtils.retryWaitMs(conf) === 2L)
266
267    conf.set("spark.akka.askTimeout", "3")
268    assert(RpcUtils.askRpcTimeout(conf).duration === (3 seconds))
269
270    conf.set("spark.akka.lookupTimeout", "4")
271    assert(RpcUtils.lookupRpcTimeout(conf).duration === (4 seconds))
272  }
273
274  test("SPARK-13727") {
275    val conf = new SparkConf()
276    // set the conf in the deprecated way
277    conf.set("spark.io.compression.lz4.block.size", "12345")
278    // get the conf in the recommended way
279    assert(conf.get("spark.io.compression.lz4.blockSize") === "12345")
280    // we can still get the conf in the deprecated way
281    assert(conf.get("spark.io.compression.lz4.block.size") === "12345")
282    // the contains() also works as expected
283    assert(conf.contains("spark.io.compression.lz4.block.size"))
284    assert(conf.contains("spark.io.compression.lz4.blockSize"))
285    assert(conf.contains("spark.io.unknown") === false)
286  }
287
288  val serializers = Map(
289    "java" -> new JavaSerializer(new SparkConf()),
290    "kryo" -> new KryoSerializer(new SparkConf()))
291
292  serializers.foreach { case (name, ser) =>
293    test(s"SPARK-17240: SparkConf should be serializable ($name)") {
294      val conf = new SparkConf()
295      conf.set(DRIVER_CLASS_PATH, "${" + DRIVER_JAVA_OPTIONS.key + "}")
296      conf.set(DRIVER_JAVA_OPTIONS, "test")
297
298      val serializer = ser.newInstance()
299      val bytes = serializer.serialize(conf)
300      val deser = serializer.deserialize[SparkConf](bytes)
301
302      assert(conf.get(DRIVER_CLASS_PATH) === deser.get(DRIVER_CLASS_PATH))
303    }
304  }
305
306}
307
308class Class1 {}
309class Class2 {}
310class Class3 {}
311
312class CustomRegistrator extends KryoRegistrator {
313  def registerClasses(kryo: Kryo) {
314    kryo.register(classOf[Class2])
315  }
316}
317