1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.serializer 19 20import java.io._ 21import java.nio.ByteBuffer 22import javax.annotation.Nullable 23 24import scala.collection.JavaConverters._ 25import scala.collection.mutable.ArrayBuffer 26import scala.reflect.ClassTag 27 28import com.esotericsoftware.kryo.{Kryo, KryoException, Serializer => KryoClassSerializer} 29import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} 30import com.esotericsoftware.kryo.io.{UnsafeInput => KryoUnsafeInput, UnsafeOutput => KryoUnsafeOutput} 31import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer} 32import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator} 33import org.apache.avro.generic.{GenericData, GenericRecord} 34import org.roaringbitmap.RoaringBitmap 35 36import org.apache.spark._ 37import org.apache.spark.api.python.PythonBroadcast 38import org.apache.spark.internal.Logging 39import org.apache.spark.network.util.ByteUnit 40import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus} 41import org.apache.spark.storage._ 42import org.apache.spark.util.{BoundedPriorityQueue, SerializableConfiguration, SerializableJobConf, Utils} 43import org.apache.spark.util.collection.CompactBuffer 44 45/** 46 * A Spark serializer that uses the <a href="https://code.google.com/p/kryo/"> 47 * Kryo serialization library</a>. 48 * 49 * @note This serializer is not guaranteed to be wire-compatible across different versions of 50 * Spark. It is intended to be used to serialize/de-serialize data within a single 51 * Spark application. 52 */ 53class KryoSerializer(conf: SparkConf) 54 extends org.apache.spark.serializer.Serializer 55 with Logging 56 with Serializable { 57 58 private val bufferSizeKb = conf.getSizeAsKb("spark.kryoserializer.buffer", "64k") 59 60 if (bufferSizeKb >= ByteUnit.GiB.toKiB(2)) { 61 throw new IllegalArgumentException("spark.kryoserializer.buffer must be less than " + 62 s"2048 mb, got: + ${ByteUnit.KiB.toMiB(bufferSizeKb)} mb.") 63 } 64 private val bufferSize = ByteUnit.KiB.toBytes(bufferSizeKb).toInt 65 66 val maxBufferSizeMb = conf.getSizeAsMb("spark.kryoserializer.buffer.max", "64m").toInt 67 if (maxBufferSizeMb >= ByteUnit.GiB.toMiB(2)) { 68 throw new IllegalArgumentException("spark.kryoserializer.buffer.max must be less than " + 69 s"2048 mb, got: + $maxBufferSizeMb mb.") 70 } 71 private val maxBufferSize = ByteUnit.MiB.toBytes(maxBufferSizeMb).toInt 72 73 private val referenceTracking = conf.getBoolean("spark.kryo.referenceTracking", true) 74 private val registrationRequired = conf.getBoolean("spark.kryo.registrationRequired", false) 75 private val userRegistrators = conf.get("spark.kryo.registrator", "") 76 .split(',').map(_.trim) 77 .filter(!_.isEmpty) 78 private val classesToRegister = conf.get("spark.kryo.classesToRegister", "") 79 .split(',').map(_.trim) 80 .filter(!_.isEmpty) 81 82 private val avroSchemas = conf.getAvroSchema 83 // whether to use unsafe based IO for serialization 84 private val useUnsafe = conf.getBoolean("spark.kryo.unsafe", false) 85 86 def newKryoOutput(): KryoOutput = 87 if (useUnsafe) { 88 new KryoUnsafeOutput(bufferSize, math.max(bufferSize, maxBufferSize)) 89 } else { 90 new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize)) 91 } 92 93 def newKryo(): Kryo = { 94 val instantiator = new EmptyScalaKryoInstantiator 95 val kryo = instantiator.newKryo() 96 kryo.setRegistrationRequired(registrationRequired) 97 98 val oldClassLoader = Thread.currentThread.getContextClassLoader 99 val classLoader = defaultClassLoader.getOrElse(Thread.currentThread.getContextClassLoader) 100 101 // Allow disabling Kryo reference tracking if user knows their object graphs don't have loops. 102 // Do this before we invoke the user registrator so the user registrator can override this. 103 kryo.setReferences(referenceTracking) 104 105 for (cls <- KryoSerializer.toRegister) { 106 kryo.register(cls) 107 } 108 for ((cls, ser) <- KryoSerializer.toRegisterSerializer) { 109 kryo.register(cls, ser) 110 } 111 112 // For results returned by asJavaIterable. See JavaIterableWrapperSerializer. 113 kryo.register(JavaIterableWrapperSerializer.wrapperClass, new JavaIterableWrapperSerializer) 114 115 // Allow sending classes with custom Java serializers 116 kryo.register(classOf[SerializableWritable[_]], new KryoJavaSerializer()) 117 kryo.register(classOf[SerializableConfiguration], new KryoJavaSerializer()) 118 kryo.register(classOf[SerializableJobConf], new KryoJavaSerializer()) 119 kryo.register(classOf[PythonBroadcast], new KryoJavaSerializer()) 120 121 kryo.register(classOf[GenericRecord], new GenericAvroSerializer(avroSchemas)) 122 kryo.register(classOf[GenericData.Record], new GenericAvroSerializer(avroSchemas)) 123 124 try { 125 // scalastyle:off classforname 126 // Use the default classloader when calling the user registrator. 127 Thread.currentThread.setContextClassLoader(classLoader) 128 // Register classes given through spark.kryo.classesToRegister. 129 classesToRegister 130 .foreach { className => kryo.register(Class.forName(className, true, classLoader)) } 131 // Allow the user to register their own classes by setting spark.kryo.registrator. 132 userRegistrators 133 .map(Class.forName(_, true, classLoader).newInstance().asInstanceOf[KryoRegistrator]) 134 .foreach { reg => reg.registerClasses(kryo) } 135 // scalastyle:on classforname 136 } catch { 137 case e: Exception => 138 throw new SparkException(s"Failed to register classes with Kryo", e) 139 } finally { 140 Thread.currentThread.setContextClassLoader(oldClassLoader) 141 } 142 143 // Register Chill's classes; we do this after our ranges and the user's own classes to let 144 // our code override the generic serializers in Chill for things like Seq 145 new AllScalaRegistrar().apply(kryo) 146 147 // Register types missed by Chill. 148 // scalastyle:off 149 kryo.register(classOf[Array[Tuple1[Any]]]) 150 kryo.register(classOf[Array[Tuple2[Any, Any]]]) 151 kryo.register(classOf[Array[Tuple3[Any, Any, Any]]]) 152 kryo.register(classOf[Array[Tuple4[Any, Any, Any, Any]]]) 153 kryo.register(classOf[Array[Tuple5[Any, Any, Any, Any, Any]]]) 154 kryo.register(classOf[Array[Tuple6[Any, Any, Any, Any, Any, Any]]]) 155 kryo.register(classOf[Array[Tuple7[Any, Any, Any, Any, Any, Any, Any]]]) 156 kryo.register(classOf[Array[Tuple8[Any, Any, Any, Any, Any, Any, Any, Any]]]) 157 kryo.register(classOf[Array[Tuple9[Any, Any, Any, Any, Any, Any, Any, Any, Any]]]) 158 kryo.register(classOf[Array[Tuple10[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]]) 159 kryo.register(classOf[Array[Tuple11[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]]) 160 kryo.register(classOf[Array[Tuple12[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]]) 161 kryo.register(classOf[Array[Tuple13[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]]) 162 kryo.register(classOf[Array[Tuple14[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]]) 163 kryo.register(classOf[Array[Tuple15[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]]) 164 kryo.register(classOf[Array[Tuple16[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]]) 165 kryo.register(classOf[Array[Tuple17[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]]) 166 kryo.register(classOf[Array[Tuple18[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]]) 167 kryo.register(classOf[Array[Tuple19[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]]) 168 kryo.register(classOf[Array[Tuple20[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]]) 169 kryo.register(classOf[Array[Tuple21[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]]) 170 kryo.register(classOf[Array[Tuple22[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]]) 171 172 // scalastyle:on 173 174 kryo.register(None.getClass) 175 kryo.register(Nil.getClass) 176 kryo.register(Utils.classForName("scala.collection.immutable.$colon$colon")) 177 kryo.register(classOf[ArrayBuffer[Any]]) 178 179 kryo.setClassLoader(classLoader) 180 kryo 181 } 182 183 override def newInstance(): SerializerInstance = { 184 new KryoSerializerInstance(this, useUnsafe) 185 } 186 187 private[spark] override lazy val supportsRelocationOfSerializedObjects: Boolean = { 188 // If auto-reset is disabled, then Kryo may store references to duplicate occurrences of objects 189 // in the stream rather than writing those objects' serialized bytes, breaking relocation. See 190 // https://groups.google.com/d/msg/kryo-users/6ZUSyfjjtdo/FhGG1KHDXPgJ for more details. 191 newInstance().asInstanceOf[KryoSerializerInstance].getAutoReset() 192 } 193} 194 195private[spark] 196class KryoSerializationStream( 197 serInstance: KryoSerializerInstance, 198 outStream: OutputStream, 199 useUnsafe: Boolean) extends SerializationStream { 200 201 private[this] var output: KryoOutput = 202 if (useUnsafe) new KryoUnsafeOutput(outStream) else new KryoOutput(outStream) 203 204 private[this] var kryo: Kryo = serInstance.borrowKryo() 205 206 override def writeObject[T: ClassTag](t: T): SerializationStream = { 207 kryo.writeClassAndObject(output, t) 208 this 209 } 210 211 override def flush() { 212 if (output == null) { 213 throw new IOException("Stream is closed") 214 } 215 output.flush() 216 } 217 218 override def close() { 219 if (output != null) { 220 try { 221 output.close() 222 } finally { 223 serInstance.releaseKryo(kryo) 224 kryo = null 225 output = null 226 } 227 } 228 } 229} 230 231private[spark] 232class KryoDeserializationStream( 233 serInstance: KryoSerializerInstance, 234 inStream: InputStream, 235 useUnsafe: Boolean) extends DeserializationStream { 236 237 private[this] var input: KryoInput = 238 if (useUnsafe) new KryoUnsafeInput(inStream) else new KryoInput(inStream) 239 240 private[this] var kryo: Kryo = serInstance.borrowKryo() 241 242 override def readObject[T: ClassTag](): T = { 243 try { 244 kryo.readClassAndObject(input).asInstanceOf[T] 245 } catch { 246 // DeserializationStream uses the EOF exception to indicate stopping condition. 247 case e: KryoException if e.getMessage.toLowerCase.contains("buffer underflow") => 248 throw new EOFException 249 } 250 } 251 252 override def close() { 253 if (input != null) { 254 try { 255 // Kryo's Input automatically closes the input stream it is using. 256 input.close() 257 } finally { 258 serInstance.releaseKryo(kryo) 259 kryo = null 260 input = null 261 } 262 } 263 } 264} 265 266private[spark] class KryoSerializerInstance(ks: KryoSerializer, useUnsafe: Boolean) 267 extends SerializerInstance { 268 /** 269 * A re-used [[Kryo]] instance. Methods will borrow this instance by calling `borrowKryo()`, do 270 * their work, then release the instance by calling `releaseKryo()`. Logically, this is a caching 271 * pool of size one. SerializerInstances are not thread-safe, hence accesses to this field are 272 * not synchronized. 273 */ 274 @Nullable private[this] var cachedKryo: Kryo = borrowKryo() 275 276 /** 277 * Borrows a [[Kryo]] instance. If possible, this tries to re-use a cached Kryo instance; 278 * otherwise, it allocates a new instance. 279 */ 280 private[serializer] def borrowKryo(): Kryo = { 281 if (cachedKryo != null) { 282 val kryo = cachedKryo 283 // As a defensive measure, call reset() to clear any Kryo state that might have been modified 284 // by the last operation to borrow this instance (see SPARK-7766 for discussion of this issue) 285 kryo.reset() 286 cachedKryo = null 287 kryo 288 } else { 289 ks.newKryo() 290 } 291 } 292 293 /** 294 * Release a borrowed [[Kryo]] instance. If this serializer instance already has a cached Kryo 295 * instance, then the given Kryo instance is discarded; otherwise, the Kryo is stored for later 296 * re-use. 297 */ 298 private[serializer] def releaseKryo(kryo: Kryo): Unit = { 299 if (cachedKryo == null) { 300 cachedKryo = kryo 301 } 302 } 303 304 // Make these lazy vals to avoid creating a buffer unless we use them. 305 private lazy val output = ks.newKryoOutput() 306 private lazy val input = if (useUnsafe) new KryoUnsafeInput() else new KryoInput() 307 308 override def serialize[T: ClassTag](t: T): ByteBuffer = { 309 output.clear() 310 val kryo = borrowKryo() 311 try { 312 kryo.writeClassAndObject(output, t) 313 } catch { 314 case e: KryoException if e.getMessage.startsWith("Buffer overflow") => 315 throw new SparkException(s"Kryo serialization failed: ${e.getMessage}. To avoid this, " + 316 "increase spark.kryoserializer.buffer.max value.") 317 } finally { 318 releaseKryo(kryo) 319 } 320 ByteBuffer.wrap(output.toBytes) 321 } 322 323 override def deserialize[T: ClassTag](bytes: ByteBuffer): T = { 324 val kryo = borrowKryo() 325 try { 326 input.setBuffer(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining()) 327 kryo.readClassAndObject(input).asInstanceOf[T] 328 } finally { 329 releaseKryo(kryo) 330 } 331 } 332 333 override def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T = { 334 val kryo = borrowKryo() 335 val oldClassLoader = kryo.getClassLoader 336 try { 337 kryo.setClassLoader(loader) 338 input.setBuffer(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining()) 339 kryo.readClassAndObject(input).asInstanceOf[T] 340 } finally { 341 kryo.setClassLoader(oldClassLoader) 342 releaseKryo(kryo) 343 } 344 } 345 346 override def serializeStream(s: OutputStream): SerializationStream = { 347 new KryoSerializationStream(this, s, useUnsafe) 348 } 349 350 override def deserializeStream(s: InputStream): DeserializationStream = { 351 new KryoDeserializationStream(this, s, useUnsafe) 352 } 353 354 /** 355 * Returns true if auto-reset is on. The only reason this would be false is if the user-supplied 356 * registrator explicitly turns auto-reset off. 357 */ 358 def getAutoReset(): Boolean = { 359 val field = classOf[Kryo].getDeclaredField("autoReset") 360 field.setAccessible(true) 361 val kryo = borrowKryo() 362 try { 363 field.get(kryo).asInstanceOf[Boolean] 364 } finally { 365 releaseKryo(kryo) 366 } 367 } 368} 369 370/** 371 * Interface implemented by clients to register their classes with Kryo when using Kryo 372 * serialization. 373 */ 374trait KryoRegistrator { 375 def registerClasses(kryo: Kryo): Unit 376} 377 378private[serializer] object KryoSerializer { 379 // Commonly used classes. 380 private val toRegister: Seq[Class[_]] = Seq( 381 ByteBuffer.allocate(1).getClass, 382 classOf[StorageLevel], 383 classOf[CompressedMapStatus], 384 classOf[HighlyCompressedMapStatus], 385 classOf[CompactBuffer[_]], 386 classOf[BlockManagerId], 387 classOf[Array[Byte]], 388 classOf[Array[Short]], 389 classOf[Array[Long]], 390 classOf[BoundedPriorityQueue[_]], 391 classOf[SparkConf] 392 ) 393 394 private val toRegisterSerializer = Map[Class[_], KryoClassSerializer[_]]( 395 classOf[RoaringBitmap] -> new KryoClassSerializer[RoaringBitmap]() { 396 override def write(kryo: Kryo, output: KryoOutput, bitmap: RoaringBitmap): Unit = { 397 bitmap.serialize(new KryoOutputObjectOutputBridge(kryo, output)) 398 } 399 override def read(kryo: Kryo, input: KryoInput, cls: Class[RoaringBitmap]): RoaringBitmap = { 400 val ret = new RoaringBitmap 401 ret.deserialize(new KryoInputObjectInputBridge(kryo, input)) 402 ret 403 } 404 } 405 ) 406} 407 408/** 409 * This is a bridge class to wrap KryoInput as an InputStream and ObjectInput. It forwards all 410 * methods of InputStream and ObjectInput to KryoInput. It's usually helpful when an API expects 411 * an InputStream or ObjectInput but you want to use Kryo. 412 */ 413private[spark] class KryoInputObjectInputBridge( 414 kryo: Kryo, input: KryoInput) extends FilterInputStream(input) with ObjectInput { 415 override def readLong(): Long = input.readLong() 416 override def readChar(): Char = input.readChar() 417 override def readFloat(): Float = input.readFloat() 418 override def readByte(): Byte = input.readByte() 419 override def readShort(): Short = input.readShort() 420 override def readUTF(): String = input.readString() // readString in kryo does utf8 421 override def readInt(): Int = input.readInt() 422 override def readUnsignedShort(): Int = input.readShortUnsigned() 423 override def skipBytes(n: Int): Int = { 424 input.skip(n) 425 n 426 } 427 override def readFully(b: Array[Byte]): Unit = input.read(b) 428 override def readFully(b: Array[Byte], off: Int, len: Int): Unit = input.read(b, off, len) 429 override def readLine(): String = throw new UnsupportedOperationException("readLine") 430 override def readBoolean(): Boolean = input.readBoolean() 431 override def readUnsignedByte(): Int = input.readByteUnsigned() 432 override def readDouble(): Double = input.readDouble() 433 override def readObject(): AnyRef = kryo.readClassAndObject(input) 434} 435 436/** 437 * This is a bridge class to wrap KryoOutput as an OutputStream and ObjectOutput. It forwards all 438 * methods of OutputStream and ObjectOutput to KryoOutput. It's usually helpful when an API expects 439 * an OutputStream or ObjectOutput but you want to use Kryo. 440 */ 441private[spark] class KryoOutputObjectOutputBridge( 442 kryo: Kryo, output: KryoOutput) extends FilterOutputStream(output) with ObjectOutput { 443 override def writeFloat(v: Float): Unit = output.writeFloat(v) 444 // There is no "readChars" counterpart, except maybe "readLine", which is not supported 445 override def writeChars(s: String): Unit = throw new UnsupportedOperationException("writeChars") 446 override def writeDouble(v: Double): Unit = output.writeDouble(v) 447 override def writeUTF(s: String): Unit = output.writeString(s) // writeString in kryo does UTF8 448 override def writeShort(v: Int): Unit = output.writeShort(v) 449 override def writeInt(v: Int): Unit = output.writeInt(v) 450 override def writeBoolean(v: Boolean): Unit = output.writeBoolean(v) 451 override def write(b: Int): Unit = output.write(b) 452 override def write(b: Array[Byte]): Unit = output.write(b) 453 override def write(b: Array[Byte], off: Int, len: Int): Unit = output.write(b, off, len) 454 override def writeBytes(s: String): Unit = output.writeString(s) 455 override def writeChar(v: Int): Unit = output.writeChar(v.toChar) 456 override def writeLong(v: Long): Unit = output.writeLong(v) 457 override def writeByte(v: Int): Unit = output.writeByte(v) 458 override def writeObject(obj: AnyRef): Unit = kryo.writeClassAndObject(output, obj) 459} 460 461/** 462 * A Kryo serializer for serializing results returned by asJavaIterable. 463 * 464 * The underlying object is scala.collection.convert.Wrappers$IterableWrapper. 465 * Kryo deserializes this into an AbstractCollection, which unfortunately doesn't work. 466 */ 467private class JavaIterableWrapperSerializer 468 extends com.esotericsoftware.kryo.Serializer[java.lang.Iterable[_]] { 469 470 import JavaIterableWrapperSerializer._ 471 472 override def write(kryo: Kryo, out: KryoOutput, obj: java.lang.Iterable[_]): Unit = { 473 // If the object is the wrapper, simply serialize the underlying Scala Iterable object. 474 // Otherwise, serialize the object itself. 475 if (obj.getClass == wrapperClass && underlyingMethodOpt.isDefined) { 476 kryo.writeClassAndObject(out, underlyingMethodOpt.get.invoke(obj)) 477 } else { 478 kryo.writeClassAndObject(out, obj) 479 } 480 } 481 482 override def read(kryo: Kryo, in: KryoInput, clz: Class[java.lang.Iterable[_]]) 483 : java.lang.Iterable[_] = { 484 kryo.readClassAndObject(in) match { 485 case scalaIterable: Iterable[_] => scalaIterable.asJava 486 case javaIterable: java.lang.Iterable[_] => javaIterable 487 } 488 } 489} 490 491private object JavaIterableWrapperSerializer extends Logging { 492 // The class returned by JavaConverters.asJava 493 // (scala.collection.convert.Wrappers$IterableWrapper). 494 val wrapperClass = 495 scala.collection.convert.WrapAsJava.asJavaIterable(Seq(1)).getClass 496 497 // Get the underlying method so we can use it to get the Scala collection for serialization. 498 private val underlyingMethodOpt = { 499 try Some(wrapperClass.getDeclaredMethod("underlying")) catch { 500 case e: Exception => 501 logError("Failed to find the underlying field in " + wrapperClass, e) 502 None 503 } 504 } 505} 506