1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.serializer
19
20import java.io._
21import java.nio.ByteBuffer
22import javax.annotation.Nullable
23
24import scala.collection.JavaConverters._
25import scala.collection.mutable.ArrayBuffer
26import scala.reflect.ClassTag
27
28import com.esotericsoftware.kryo.{Kryo, KryoException, Serializer => KryoClassSerializer}
29import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
30import com.esotericsoftware.kryo.io.{UnsafeInput => KryoUnsafeInput, UnsafeOutput => KryoUnsafeOutput}
31import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer}
32import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator}
33import org.apache.avro.generic.{GenericData, GenericRecord}
34import org.roaringbitmap.RoaringBitmap
35
36import org.apache.spark._
37import org.apache.spark.api.python.PythonBroadcast
38import org.apache.spark.internal.Logging
39import org.apache.spark.network.util.ByteUnit
40import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus}
41import org.apache.spark.storage._
42import org.apache.spark.util.{BoundedPriorityQueue, SerializableConfiguration, SerializableJobConf, Utils}
43import org.apache.spark.util.collection.CompactBuffer
44
45/**
46 * A Spark serializer that uses the <a href="https://code.google.com/p/kryo/">
47 * Kryo serialization library</a>.
48 *
49 * @note This serializer is not guaranteed to be wire-compatible across different versions of
50 * Spark. It is intended to be used to serialize/de-serialize data within a single
51 * Spark application.
52 */
53class KryoSerializer(conf: SparkConf)
54  extends org.apache.spark.serializer.Serializer
55  with Logging
56  with Serializable {
57
58  private val bufferSizeKb = conf.getSizeAsKb("spark.kryoserializer.buffer", "64k")
59
60  if (bufferSizeKb >= ByteUnit.GiB.toKiB(2)) {
61    throw new IllegalArgumentException("spark.kryoserializer.buffer must be less than " +
62      s"2048 mb, got: + ${ByteUnit.KiB.toMiB(bufferSizeKb)} mb.")
63  }
64  private val bufferSize = ByteUnit.KiB.toBytes(bufferSizeKb).toInt
65
66  val maxBufferSizeMb = conf.getSizeAsMb("spark.kryoserializer.buffer.max", "64m").toInt
67  if (maxBufferSizeMb >= ByteUnit.GiB.toMiB(2)) {
68    throw new IllegalArgumentException("spark.kryoserializer.buffer.max must be less than " +
69      s"2048 mb, got: + $maxBufferSizeMb mb.")
70  }
71  private val maxBufferSize = ByteUnit.MiB.toBytes(maxBufferSizeMb).toInt
72
73  private val referenceTracking = conf.getBoolean("spark.kryo.referenceTracking", true)
74  private val registrationRequired = conf.getBoolean("spark.kryo.registrationRequired", false)
75  private val userRegistrators = conf.get("spark.kryo.registrator", "")
76    .split(',').map(_.trim)
77    .filter(!_.isEmpty)
78  private val classesToRegister = conf.get("spark.kryo.classesToRegister", "")
79    .split(',').map(_.trim)
80    .filter(!_.isEmpty)
81
82  private val avroSchemas = conf.getAvroSchema
83  // whether to use unsafe based IO for serialization
84  private val useUnsafe = conf.getBoolean("spark.kryo.unsafe", false)
85
86  def newKryoOutput(): KryoOutput =
87    if (useUnsafe) {
88      new KryoUnsafeOutput(bufferSize, math.max(bufferSize, maxBufferSize))
89    } else {
90      new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize))
91    }
92
93  def newKryo(): Kryo = {
94    val instantiator = new EmptyScalaKryoInstantiator
95    val kryo = instantiator.newKryo()
96    kryo.setRegistrationRequired(registrationRequired)
97
98    val oldClassLoader = Thread.currentThread.getContextClassLoader
99    val classLoader = defaultClassLoader.getOrElse(Thread.currentThread.getContextClassLoader)
100
101    // Allow disabling Kryo reference tracking if user knows their object graphs don't have loops.
102    // Do this before we invoke the user registrator so the user registrator can override this.
103    kryo.setReferences(referenceTracking)
104
105    for (cls <- KryoSerializer.toRegister) {
106      kryo.register(cls)
107    }
108    for ((cls, ser) <- KryoSerializer.toRegisterSerializer) {
109      kryo.register(cls, ser)
110    }
111
112    // For results returned by asJavaIterable. See JavaIterableWrapperSerializer.
113    kryo.register(JavaIterableWrapperSerializer.wrapperClass, new JavaIterableWrapperSerializer)
114
115    // Allow sending classes with custom Java serializers
116    kryo.register(classOf[SerializableWritable[_]], new KryoJavaSerializer())
117    kryo.register(classOf[SerializableConfiguration], new KryoJavaSerializer())
118    kryo.register(classOf[SerializableJobConf], new KryoJavaSerializer())
119    kryo.register(classOf[PythonBroadcast], new KryoJavaSerializer())
120
121    kryo.register(classOf[GenericRecord], new GenericAvroSerializer(avroSchemas))
122    kryo.register(classOf[GenericData.Record], new GenericAvroSerializer(avroSchemas))
123
124    try {
125      // scalastyle:off classforname
126      // Use the default classloader when calling the user registrator.
127      Thread.currentThread.setContextClassLoader(classLoader)
128      // Register classes given through spark.kryo.classesToRegister.
129      classesToRegister
130        .foreach { className => kryo.register(Class.forName(className, true, classLoader)) }
131      // Allow the user to register their own classes by setting spark.kryo.registrator.
132      userRegistrators
133        .map(Class.forName(_, true, classLoader).newInstance().asInstanceOf[KryoRegistrator])
134        .foreach { reg => reg.registerClasses(kryo) }
135      // scalastyle:on classforname
136    } catch {
137      case e: Exception =>
138        throw new SparkException(s"Failed to register classes with Kryo", e)
139    } finally {
140      Thread.currentThread.setContextClassLoader(oldClassLoader)
141    }
142
143    // Register Chill's classes; we do this after our ranges and the user's own classes to let
144    // our code override the generic serializers in Chill for things like Seq
145    new AllScalaRegistrar().apply(kryo)
146
147    // Register types missed by Chill.
148    // scalastyle:off
149    kryo.register(classOf[Array[Tuple1[Any]]])
150    kryo.register(classOf[Array[Tuple2[Any, Any]]])
151    kryo.register(classOf[Array[Tuple3[Any, Any, Any]]])
152    kryo.register(classOf[Array[Tuple4[Any, Any, Any, Any]]])
153    kryo.register(classOf[Array[Tuple5[Any, Any, Any, Any, Any]]])
154    kryo.register(classOf[Array[Tuple6[Any, Any, Any, Any, Any, Any]]])
155    kryo.register(classOf[Array[Tuple7[Any, Any, Any, Any, Any, Any, Any]]])
156    kryo.register(classOf[Array[Tuple8[Any, Any, Any, Any, Any, Any, Any, Any]]])
157    kryo.register(classOf[Array[Tuple9[Any, Any, Any, Any, Any, Any, Any, Any, Any]]])
158    kryo.register(classOf[Array[Tuple10[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]])
159    kryo.register(classOf[Array[Tuple11[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]])
160    kryo.register(classOf[Array[Tuple12[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]])
161    kryo.register(classOf[Array[Tuple13[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]])
162    kryo.register(classOf[Array[Tuple14[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]])
163    kryo.register(classOf[Array[Tuple15[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]])
164    kryo.register(classOf[Array[Tuple16[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]])
165    kryo.register(classOf[Array[Tuple17[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]])
166    kryo.register(classOf[Array[Tuple18[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]])
167    kryo.register(classOf[Array[Tuple19[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]])
168    kryo.register(classOf[Array[Tuple20[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]])
169    kryo.register(classOf[Array[Tuple21[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]])
170    kryo.register(classOf[Array[Tuple22[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]])
171
172    // scalastyle:on
173
174    kryo.register(None.getClass)
175    kryo.register(Nil.getClass)
176    kryo.register(Utils.classForName("scala.collection.immutable.$colon$colon"))
177    kryo.register(classOf[ArrayBuffer[Any]])
178
179    kryo.setClassLoader(classLoader)
180    kryo
181  }
182
183  override def newInstance(): SerializerInstance = {
184    new KryoSerializerInstance(this, useUnsafe)
185  }
186
187  private[spark] override lazy val supportsRelocationOfSerializedObjects: Boolean = {
188    // If auto-reset is disabled, then Kryo may store references to duplicate occurrences of objects
189    // in the stream rather than writing those objects' serialized bytes, breaking relocation. See
190    // https://groups.google.com/d/msg/kryo-users/6ZUSyfjjtdo/FhGG1KHDXPgJ for more details.
191    newInstance().asInstanceOf[KryoSerializerInstance].getAutoReset()
192  }
193}
194
195private[spark]
196class KryoSerializationStream(
197    serInstance: KryoSerializerInstance,
198    outStream: OutputStream,
199    useUnsafe: Boolean) extends SerializationStream {
200
201  private[this] var output: KryoOutput =
202    if (useUnsafe) new KryoUnsafeOutput(outStream) else new KryoOutput(outStream)
203
204  private[this] var kryo: Kryo = serInstance.borrowKryo()
205
206  override def writeObject[T: ClassTag](t: T): SerializationStream = {
207    kryo.writeClassAndObject(output, t)
208    this
209  }
210
211  override def flush() {
212    if (output == null) {
213      throw new IOException("Stream is closed")
214    }
215    output.flush()
216  }
217
218  override def close() {
219    if (output != null) {
220      try {
221        output.close()
222      } finally {
223        serInstance.releaseKryo(kryo)
224        kryo = null
225        output = null
226      }
227    }
228  }
229}
230
231private[spark]
232class KryoDeserializationStream(
233    serInstance: KryoSerializerInstance,
234    inStream: InputStream,
235    useUnsafe: Boolean) extends DeserializationStream {
236
237  private[this] var input: KryoInput =
238    if (useUnsafe) new KryoUnsafeInput(inStream) else new KryoInput(inStream)
239
240  private[this] var kryo: Kryo = serInstance.borrowKryo()
241
242  override def readObject[T: ClassTag](): T = {
243    try {
244      kryo.readClassAndObject(input).asInstanceOf[T]
245    } catch {
246      // DeserializationStream uses the EOF exception to indicate stopping condition.
247      case e: KryoException if e.getMessage.toLowerCase.contains("buffer underflow") =>
248        throw new EOFException
249    }
250  }
251
252  override def close() {
253    if (input != null) {
254      try {
255        // Kryo's Input automatically closes the input stream it is using.
256        input.close()
257      } finally {
258        serInstance.releaseKryo(kryo)
259        kryo = null
260        input = null
261      }
262    }
263  }
264}
265
266private[spark] class KryoSerializerInstance(ks: KryoSerializer, useUnsafe: Boolean)
267  extends SerializerInstance {
268  /**
269   * A re-used [[Kryo]] instance. Methods will borrow this instance by calling `borrowKryo()`, do
270   * their work, then release the instance by calling `releaseKryo()`. Logically, this is a caching
271   * pool of size one. SerializerInstances are not thread-safe, hence accesses to this field are
272   * not synchronized.
273   */
274  @Nullable private[this] var cachedKryo: Kryo = borrowKryo()
275
276  /**
277   * Borrows a [[Kryo]] instance. If possible, this tries to re-use a cached Kryo instance;
278   * otherwise, it allocates a new instance.
279   */
280  private[serializer] def borrowKryo(): Kryo = {
281    if (cachedKryo != null) {
282      val kryo = cachedKryo
283      // As a defensive measure, call reset() to clear any Kryo state that might have been modified
284      // by the last operation to borrow this instance (see SPARK-7766 for discussion of this issue)
285      kryo.reset()
286      cachedKryo = null
287      kryo
288    } else {
289      ks.newKryo()
290    }
291  }
292
293  /**
294   * Release a borrowed [[Kryo]] instance. If this serializer instance already has a cached Kryo
295   * instance, then the given Kryo instance is discarded; otherwise, the Kryo is stored for later
296   * re-use.
297   */
298  private[serializer] def releaseKryo(kryo: Kryo): Unit = {
299    if (cachedKryo == null) {
300      cachedKryo = kryo
301    }
302  }
303
304  // Make these lazy vals to avoid creating a buffer unless we use them.
305  private lazy val output = ks.newKryoOutput()
306  private lazy val input = if (useUnsafe) new KryoUnsafeInput() else new KryoInput()
307
308  override def serialize[T: ClassTag](t: T): ByteBuffer = {
309    output.clear()
310    val kryo = borrowKryo()
311    try {
312      kryo.writeClassAndObject(output, t)
313    } catch {
314      case e: KryoException if e.getMessage.startsWith("Buffer overflow") =>
315        throw new SparkException(s"Kryo serialization failed: ${e.getMessage}. To avoid this, " +
316          "increase spark.kryoserializer.buffer.max value.")
317    } finally {
318      releaseKryo(kryo)
319    }
320    ByteBuffer.wrap(output.toBytes)
321  }
322
323  override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {
324    val kryo = borrowKryo()
325    try {
326      input.setBuffer(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining())
327      kryo.readClassAndObject(input).asInstanceOf[T]
328    } finally {
329      releaseKryo(kryo)
330    }
331  }
332
333  override def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T = {
334    val kryo = borrowKryo()
335    val oldClassLoader = kryo.getClassLoader
336    try {
337      kryo.setClassLoader(loader)
338      input.setBuffer(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining())
339      kryo.readClassAndObject(input).asInstanceOf[T]
340    } finally {
341      kryo.setClassLoader(oldClassLoader)
342      releaseKryo(kryo)
343    }
344  }
345
346  override def serializeStream(s: OutputStream): SerializationStream = {
347    new KryoSerializationStream(this, s, useUnsafe)
348  }
349
350  override def deserializeStream(s: InputStream): DeserializationStream = {
351    new KryoDeserializationStream(this, s, useUnsafe)
352  }
353
354  /**
355   * Returns true if auto-reset is on. The only reason this would be false is if the user-supplied
356   * registrator explicitly turns auto-reset off.
357   */
358  def getAutoReset(): Boolean = {
359    val field = classOf[Kryo].getDeclaredField("autoReset")
360    field.setAccessible(true)
361    val kryo = borrowKryo()
362    try {
363      field.get(kryo).asInstanceOf[Boolean]
364    } finally {
365      releaseKryo(kryo)
366    }
367  }
368}
369
370/**
371 * Interface implemented by clients to register their classes with Kryo when using Kryo
372 * serialization.
373 */
374trait KryoRegistrator {
375  def registerClasses(kryo: Kryo): Unit
376}
377
378private[serializer] object KryoSerializer {
379  // Commonly used classes.
380  private val toRegister: Seq[Class[_]] = Seq(
381    ByteBuffer.allocate(1).getClass,
382    classOf[StorageLevel],
383    classOf[CompressedMapStatus],
384    classOf[HighlyCompressedMapStatus],
385    classOf[CompactBuffer[_]],
386    classOf[BlockManagerId],
387    classOf[Array[Byte]],
388    classOf[Array[Short]],
389    classOf[Array[Long]],
390    classOf[BoundedPriorityQueue[_]],
391    classOf[SparkConf]
392  )
393
394  private val toRegisterSerializer = Map[Class[_], KryoClassSerializer[_]](
395    classOf[RoaringBitmap] -> new KryoClassSerializer[RoaringBitmap]() {
396      override def write(kryo: Kryo, output: KryoOutput, bitmap: RoaringBitmap): Unit = {
397        bitmap.serialize(new KryoOutputObjectOutputBridge(kryo, output))
398      }
399      override def read(kryo: Kryo, input: KryoInput, cls: Class[RoaringBitmap]): RoaringBitmap = {
400        val ret = new RoaringBitmap
401        ret.deserialize(new KryoInputObjectInputBridge(kryo, input))
402        ret
403      }
404    }
405  )
406}
407
408/**
409 * This is a bridge class to wrap KryoInput as an InputStream and ObjectInput. It forwards all
410 * methods of InputStream and ObjectInput to KryoInput. It's usually helpful when an API expects
411 * an InputStream or ObjectInput but you want to use Kryo.
412 */
413private[spark] class KryoInputObjectInputBridge(
414    kryo: Kryo, input: KryoInput) extends FilterInputStream(input) with ObjectInput {
415  override def readLong(): Long = input.readLong()
416  override def readChar(): Char = input.readChar()
417  override def readFloat(): Float = input.readFloat()
418  override def readByte(): Byte = input.readByte()
419  override def readShort(): Short = input.readShort()
420  override def readUTF(): String = input.readString() // readString in kryo does utf8
421  override def readInt(): Int = input.readInt()
422  override def readUnsignedShort(): Int = input.readShortUnsigned()
423  override def skipBytes(n: Int): Int = {
424    input.skip(n)
425    n
426  }
427  override def readFully(b: Array[Byte]): Unit = input.read(b)
428  override def readFully(b: Array[Byte], off: Int, len: Int): Unit = input.read(b, off, len)
429  override def readLine(): String = throw new UnsupportedOperationException("readLine")
430  override def readBoolean(): Boolean = input.readBoolean()
431  override def readUnsignedByte(): Int = input.readByteUnsigned()
432  override def readDouble(): Double = input.readDouble()
433  override def readObject(): AnyRef = kryo.readClassAndObject(input)
434}
435
436/**
437 * This is a bridge class to wrap KryoOutput as an OutputStream and ObjectOutput. It forwards all
438 * methods of OutputStream and ObjectOutput to KryoOutput. It's usually helpful when an API expects
439 * an OutputStream or ObjectOutput but you want to use Kryo.
440 */
441private[spark] class KryoOutputObjectOutputBridge(
442    kryo: Kryo, output: KryoOutput) extends FilterOutputStream(output) with ObjectOutput  {
443  override def writeFloat(v: Float): Unit = output.writeFloat(v)
444  // There is no "readChars" counterpart, except maybe "readLine", which is not supported
445  override def writeChars(s: String): Unit = throw new UnsupportedOperationException("writeChars")
446  override def writeDouble(v: Double): Unit = output.writeDouble(v)
447  override def writeUTF(s: String): Unit = output.writeString(s) // writeString in kryo does UTF8
448  override def writeShort(v: Int): Unit = output.writeShort(v)
449  override def writeInt(v: Int): Unit = output.writeInt(v)
450  override def writeBoolean(v: Boolean): Unit = output.writeBoolean(v)
451  override def write(b: Int): Unit = output.write(b)
452  override def write(b: Array[Byte]): Unit = output.write(b)
453  override def write(b: Array[Byte], off: Int, len: Int): Unit = output.write(b, off, len)
454  override def writeBytes(s: String): Unit = output.writeString(s)
455  override def writeChar(v: Int): Unit = output.writeChar(v.toChar)
456  override def writeLong(v: Long): Unit = output.writeLong(v)
457  override def writeByte(v: Int): Unit = output.writeByte(v)
458  override def writeObject(obj: AnyRef): Unit = kryo.writeClassAndObject(output, obj)
459}
460
461/**
462 * A Kryo serializer for serializing results returned by asJavaIterable.
463 *
464 * The underlying object is scala.collection.convert.Wrappers$IterableWrapper.
465 * Kryo deserializes this into an AbstractCollection, which unfortunately doesn't work.
466 */
467private class JavaIterableWrapperSerializer
468  extends com.esotericsoftware.kryo.Serializer[java.lang.Iterable[_]] {
469
470  import JavaIterableWrapperSerializer._
471
472  override def write(kryo: Kryo, out: KryoOutput, obj: java.lang.Iterable[_]): Unit = {
473    // If the object is the wrapper, simply serialize the underlying Scala Iterable object.
474    // Otherwise, serialize the object itself.
475    if (obj.getClass == wrapperClass && underlyingMethodOpt.isDefined) {
476      kryo.writeClassAndObject(out, underlyingMethodOpt.get.invoke(obj))
477    } else {
478      kryo.writeClassAndObject(out, obj)
479    }
480  }
481
482  override def read(kryo: Kryo, in: KryoInput, clz: Class[java.lang.Iterable[_]])
483    : java.lang.Iterable[_] = {
484    kryo.readClassAndObject(in) match {
485      case scalaIterable: Iterable[_] => scalaIterable.asJava
486      case javaIterable: java.lang.Iterable[_] => javaIterable
487    }
488  }
489}
490
491private object JavaIterableWrapperSerializer extends Logging {
492  // The class returned by JavaConverters.asJava
493  // (scala.collection.convert.Wrappers$IterableWrapper).
494  val wrapperClass =
495    scala.collection.convert.WrapAsJava.asJavaIterable(Seq(1)).getClass
496
497  // Get the underlying method so we can use it to get the Scala collection for serialization.
498  private val underlyingMethodOpt = {
499    try Some(wrapperClass.getDeclaredMethod("underlying")) catch {
500      case e: Exception =>
501        logError("Failed to find the underlying field in " + wrapperClass, e)
502        None
503    }
504  }
505}
506