1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.serializer 19 20import java.io.{BufferedInputStream, BufferedOutputStream, InputStream, OutputStream} 21import java.nio.ByteBuffer 22 23import scala.reflect.ClassTag 24 25import org.apache.spark.SparkConf 26import org.apache.spark.internal.config._ 27import org.apache.spark.io.CompressionCodec 28import org.apache.spark.security.CryptoStreamUtils 29import org.apache.spark.storage._ 30import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream} 31 32/** 33 * Component which configures serialization, compression and encryption for various Spark 34 * components, including automatic selection of which [[Serializer]] to use for shuffles. 35 */ 36private[spark] class SerializerManager( 37 defaultSerializer: Serializer, 38 conf: SparkConf, 39 encryptionKey: Option[Array[Byte]]) { 40 41 def this(defaultSerializer: Serializer, conf: SparkConf) = this(defaultSerializer, conf, None) 42 43 private[this] val kryoSerializer = new KryoSerializer(conf) 44 45 private[this] val stringClassTag: ClassTag[String] = implicitly[ClassTag[String]] 46 private[this] val primitiveAndPrimitiveArrayClassTags: Set[ClassTag[_]] = { 47 val primitiveClassTags = Set[ClassTag[_]]( 48 ClassTag.Boolean, 49 ClassTag.Byte, 50 ClassTag.Char, 51 ClassTag.Double, 52 ClassTag.Float, 53 ClassTag.Int, 54 ClassTag.Long, 55 ClassTag.Null, 56 ClassTag.Short 57 ) 58 val arrayClassTags = primitiveClassTags.map(_.wrap) 59 primitiveClassTags ++ arrayClassTags 60 } 61 62 // Whether to compress broadcast variables that are stored 63 private[this] val compressBroadcast = conf.getBoolean("spark.broadcast.compress", true) 64 // Whether to compress shuffle output that are stored 65 private[this] val compressShuffle = conf.getBoolean("spark.shuffle.compress", true) 66 // Whether to compress RDD partitions that are stored serialized 67 private[this] val compressRdds = conf.getBoolean("spark.rdd.compress", false) 68 // Whether to compress shuffle output temporarily spilled to disk 69 private[this] val compressShuffleSpill = conf.getBoolean("spark.shuffle.spill.compress", true) 70 71 /* The compression codec to use. Note that the "lazy" val is necessary because we want to delay 72 * the initialization of the compression codec until it is first used. The reason is that a Spark 73 * program could be using a user-defined codec in a third party jar, which is loaded in 74 * Executor.updateDependencies. When the BlockManager is initialized, user level jars hasn't been 75 * loaded yet. */ 76 private lazy val compressionCodec: CompressionCodec = CompressionCodec.createCodec(conf) 77 78 def encryptionEnabled: Boolean = encryptionKey.isDefined 79 80 def canUseKryo(ct: ClassTag[_]): Boolean = { 81 primitiveAndPrimitiveArrayClassTags.contains(ct) || ct == stringClassTag 82 } 83 84 // SPARK-18617: As feature in SPARK-13990 can not be applied to Spark Streaming now. The worst 85 // result is streaming job based on `Receiver` mode can not run on Spark 2.x properly. It may be 86 // a rational choice to close `kryo auto pick` feature for streaming in the first step. 87 def getSerializer(ct: ClassTag[_], autoPick: Boolean): Serializer = { 88 if (autoPick && canUseKryo(ct)) { 89 kryoSerializer 90 } else { 91 defaultSerializer 92 } 93 } 94 95 /** 96 * Pick the best serializer for shuffling an RDD of key-value pairs. 97 */ 98 def getSerializer(keyClassTag: ClassTag[_], valueClassTag: ClassTag[_]): Serializer = { 99 if (canUseKryo(keyClassTag) && canUseKryo(valueClassTag)) { 100 kryoSerializer 101 } else { 102 defaultSerializer 103 } 104 } 105 106 private def shouldCompress(blockId: BlockId): Boolean = { 107 blockId match { 108 case _: ShuffleBlockId => compressShuffle 109 case _: BroadcastBlockId => compressBroadcast 110 case _: RDDBlockId => compressRdds 111 case _: TempLocalBlockId => compressShuffleSpill 112 case _: TempShuffleBlockId => compressShuffle 113 case _ => false 114 } 115 } 116 117 /** 118 * Wrap an input stream for encryption and compression 119 */ 120 def wrapStream(blockId: BlockId, s: InputStream): InputStream = { 121 wrapForCompression(blockId, wrapForEncryption(s)) 122 } 123 124 /** 125 * Wrap an output stream for encryption and compression 126 */ 127 def wrapStream(blockId: BlockId, s: OutputStream): OutputStream = { 128 wrapForCompression(blockId, wrapForEncryption(s)) 129 } 130 131 /** 132 * Wrap an input stream for encryption if shuffle encryption is enabled 133 */ 134 def wrapForEncryption(s: InputStream): InputStream = { 135 encryptionKey 136 .map { key => CryptoStreamUtils.createCryptoInputStream(s, conf, key) } 137 .getOrElse(s) 138 } 139 140 /** 141 * Wrap an output stream for encryption if shuffle encryption is enabled 142 */ 143 def wrapForEncryption(s: OutputStream): OutputStream = { 144 encryptionKey 145 .map { key => CryptoStreamUtils.createCryptoOutputStream(s, conf, key) } 146 .getOrElse(s) 147 } 148 149 /** 150 * Wrap an output stream for compression if block compression is enabled for its block type 151 */ 152 private[this] def wrapForCompression(blockId: BlockId, s: OutputStream): OutputStream = { 153 if (shouldCompress(blockId)) compressionCodec.compressedOutputStream(s) else s 154 } 155 156 /** 157 * Wrap an input stream for compression if block compression is enabled for its block type 158 */ 159 private[this] def wrapForCompression(blockId: BlockId, s: InputStream): InputStream = { 160 if (shouldCompress(blockId)) compressionCodec.compressedInputStream(s) else s 161 } 162 163 /** Serializes into a stream. */ 164 def dataSerializeStream[T: ClassTag]( 165 blockId: BlockId, 166 outputStream: OutputStream, 167 values: Iterator[T]): Unit = { 168 val byteStream = new BufferedOutputStream(outputStream) 169 val autoPick = !blockId.isInstanceOf[StreamBlockId] 170 val ser = getSerializer(implicitly[ClassTag[T]], autoPick).newInstance() 171 ser.serializeStream(wrapStream(blockId, byteStream)).writeAll(values).close() 172 } 173 174 /** Serializes into a chunked byte buffer. */ 175 def dataSerialize[T: ClassTag]( 176 blockId: BlockId, 177 values: Iterator[T], 178 allowEncryption: Boolean = true): ChunkedByteBuffer = { 179 dataSerializeWithExplicitClassTag(blockId, values, implicitly[ClassTag[T]], 180 allowEncryption = allowEncryption) 181 } 182 183 /** Serializes into a chunked byte buffer. */ 184 def dataSerializeWithExplicitClassTag( 185 blockId: BlockId, 186 values: Iterator[_], 187 classTag: ClassTag[_], 188 allowEncryption: Boolean = true): ChunkedByteBuffer = { 189 val bbos = new ChunkedByteBufferOutputStream(1024 * 1024 * 4, ByteBuffer.allocate) 190 val byteStream = new BufferedOutputStream(bbos) 191 val autoPick = !blockId.isInstanceOf[StreamBlockId] 192 val ser = getSerializer(classTag, autoPick).newInstance() 193 val encrypted = if (allowEncryption) wrapForEncryption(byteStream) else byteStream 194 ser.serializeStream(wrapForCompression(blockId, encrypted)).writeAll(values).close() 195 bbos.toChunkedByteBuffer 196 } 197 198 /** 199 * Deserializes an InputStream into an iterator of values and disposes of it when the end of 200 * the iterator is reached. 201 */ 202 def dataDeserializeStream[T]( 203 blockId: BlockId, 204 inputStream: InputStream, 205 maybeEncrypted: Boolean = true) 206 (classTag: ClassTag[T]): Iterator[T] = { 207 val stream = new BufferedInputStream(inputStream) 208 val autoPick = !blockId.isInstanceOf[StreamBlockId] 209 val decrypted = if (maybeEncrypted) wrapForEncryption(inputStream) else inputStream 210 getSerializer(classTag, autoPick) 211 .newInstance() 212 .deserializeStream(wrapForCompression(blockId, decrypted)) 213 .asIterator.asInstanceOf[Iterator[T]] 214 } 215} 216