1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.serializer
19
20import java.io.{BufferedInputStream, BufferedOutputStream, InputStream, OutputStream}
21import java.nio.ByteBuffer
22
23import scala.reflect.ClassTag
24
25import org.apache.spark.SparkConf
26import org.apache.spark.internal.config._
27import org.apache.spark.io.CompressionCodec
28import org.apache.spark.security.CryptoStreamUtils
29import org.apache.spark.storage._
30import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream}
31
32/**
33 * Component which configures serialization, compression and encryption for various Spark
34 * components, including automatic selection of which [[Serializer]] to use for shuffles.
35 */
36private[spark] class SerializerManager(
37    defaultSerializer: Serializer,
38    conf: SparkConf,
39    encryptionKey: Option[Array[Byte]]) {
40
41  def this(defaultSerializer: Serializer, conf: SparkConf) = this(defaultSerializer, conf, None)
42
43  private[this] val kryoSerializer = new KryoSerializer(conf)
44
45  private[this] val stringClassTag: ClassTag[String] = implicitly[ClassTag[String]]
46  private[this] val primitiveAndPrimitiveArrayClassTags: Set[ClassTag[_]] = {
47    val primitiveClassTags = Set[ClassTag[_]](
48      ClassTag.Boolean,
49      ClassTag.Byte,
50      ClassTag.Char,
51      ClassTag.Double,
52      ClassTag.Float,
53      ClassTag.Int,
54      ClassTag.Long,
55      ClassTag.Null,
56      ClassTag.Short
57    )
58    val arrayClassTags = primitiveClassTags.map(_.wrap)
59    primitiveClassTags ++ arrayClassTags
60  }
61
62  // Whether to compress broadcast variables that are stored
63  private[this] val compressBroadcast = conf.getBoolean("spark.broadcast.compress", true)
64  // Whether to compress shuffle output that are stored
65  private[this] val compressShuffle = conf.getBoolean("spark.shuffle.compress", true)
66  // Whether to compress RDD partitions that are stored serialized
67  private[this] val compressRdds = conf.getBoolean("spark.rdd.compress", false)
68  // Whether to compress shuffle output temporarily spilled to disk
69  private[this] val compressShuffleSpill = conf.getBoolean("spark.shuffle.spill.compress", true)
70
71  /* The compression codec to use. Note that the "lazy" val is necessary because we want to delay
72   * the initialization of the compression codec until it is first used. The reason is that a Spark
73   * program could be using a user-defined codec in a third party jar, which is loaded in
74   * Executor.updateDependencies. When the BlockManager is initialized, user level jars hasn't been
75   * loaded yet. */
76  private lazy val compressionCodec: CompressionCodec = CompressionCodec.createCodec(conf)
77
78  def encryptionEnabled: Boolean = encryptionKey.isDefined
79
80  def canUseKryo(ct: ClassTag[_]): Boolean = {
81    primitiveAndPrimitiveArrayClassTags.contains(ct) || ct == stringClassTag
82  }
83
84  // SPARK-18617: As feature in SPARK-13990 can not be applied to Spark Streaming now. The worst
85  // result is streaming job based on `Receiver` mode can not run on Spark 2.x properly. It may be
86  // a rational choice to close `kryo auto pick` feature for streaming in the first step.
87  def getSerializer(ct: ClassTag[_], autoPick: Boolean): Serializer = {
88    if (autoPick && canUseKryo(ct)) {
89      kryoSerializer
90    } else {
91      defaultSerializer
92    }
93  }
94
95  /**
96   * Pick the best serializer for shuffling an RDD of key-value pairs.
97   */
98  def getSerializer(keyClassTag: ClassTag[_], valueClassTag: ClassTag[_]): Serializer = {
99    if (canUseKryo(keyClassTag) && canUseKryo(valueClassTag)) {
100      kryoSerializer
101    } else {
102      defaultSerializer
103    }
104  }
105
106  private def shouldCompress(blockId: BlockId): Boolean = {
107    blockId match {
108      case _: ShuffleBlockId => compressShuffle
109      case _: BroadcastBlockId => compressBroadcast
110      case _: RDDBlockId => compressRdds
111      case _: TempLocalBlockId => compressShuffleSpill
112      case _: TempShuffleBlockId => compressShuffle
113      case _ => false
114    }
115  }
116
117  /**
118   * Wrap an input stream for encryption and compression
119   */
120  def wrapStream(blockId: BlockId, s: InputStream): InputStream = {
121    wrapForCompression(blockId, wrapForEncryption(s))
122  }
123
124  /**
125   * Wrap an output stream for encryption and compression
126   */
127  def wrapStream(blockId: BlockId, s: OutputStream): OutputStream = {
128    wrapForCompression(blockId, wrapForEncryption(s))
129  }
130
131  /**
132   * Wrap an input stream for encryption if shuffle encryption is enabled
133   */
134  def wrapForEncryption(s: InputStream): InputStream = {
135    encryptionKey
136      .map { key => CryptoStreamUtils.createCryptoInputStream(s, conf, key) }
137      .getOrElse(s)
138  }
139
140  /**
141   * Wrap an output stream for encryption if shuffle encryption is enabled
142   */
143  def wrapForEncryption(s: OutputStream): OutputStream = {
144    encryptionKey
145      .map { key => CryptoStreamUtils.createCryptoOutputStream(s, conf, key) }
146      .getOrElse(s)
147  }
148
149  /**
150   * Wrap an output stream for compression if block compression is enabled for its block type
151   */
152  private[this] def wrapForCompression(blockId: BlockId, s: OutputStream): OutputStream = {
153    if (shouldCompress(blockId)) compressionCodec.compressedOutputStream(s) else s
154  }
155
156  /**
157   * Wrap an input stream for compression if block compression is enabled for its block type
158   */
159  private[this] def wrapForCompression(blockId: BlockId, s: InputStream): InputStream = {
160    if (shouldCompress(blockId)) compressionCodec.compressedInputStream(s) else s
161  }
162
163  /** Serializes into a stream. */
164  def dataSerializeStream[T: ClassTag](
165      blockId: BlockId,
166      outputStream: OutputStream,
167      values: Iterator[T]): Unit = {
168    val byteStream = new BufferedOutputStream(outputStream)
169    val autoPick = !blockId.isInstanceOf[StreamBlockId]
170    val ser = getSerializer(implicitly[ClassTag[T]], autoPick).newInstance()
171    ser.serializeStream(wrapStream(blockId, byteStream)).writeAll(values).close()
172  }
173
174  /** Serializes into a chunked byte buffer. */
175  def dataSerialize[T: ClassTag](
176      blockId: BlockId,
177      values: Iterator[T],
178      allowEncryption: Boolean = true): ChunkedByteBuffer = {
179    dataSerializeWithExplicitClassTag(blockId, values, implicitly[ClassTag[T]],
180      allowEncryption = allowEncryption)
181  }
182
183  /** Serializes into a chunked byte buffer. */
184  def dataSerializeWithExplicitClassTag(
185      blockId: BlockId,
186      values: Iterator[_],
187      classTag: ClassTag[_],
188      allowEncryption: Boolean = true): ChunkedByteBuffer = {
189    val bbos = new ChunkedByteBufferOutputStream(1024 * 1024 * 4, ByteBuffer.allocate)
190    val byteStream = new BufferedOutputStream(bbos)
191    val autoPick = !blockId.isInstanceOf[StreamBlockId]
192    val ser = getSerializer(classTag, autoPick).newInstance()
193    val encrypted = if (allowEncryption) wrapForEncryption(byteStream) else byteStream
194    ser.serializeStream(wrapForCompression(blockId, encrypted)).writeAll(values).close()
195    bbos.toChunkedByteBuffer
196  }
197
198  /**
199   * Deserializes an InputStream into an iterator of values and disposes of it when the end of
200   * the iterator is reached.
201   */
202  def dataDeserializeStream[T](
203      blockId: BlockId,
204      inputStream: InputStream,
205      maybeEncrypted: Boolean = true)
206      (classTag: ClassTag[T]): Iterator[T] = {
207    val stream = new BufferedInputStream(inputStream)
208    val autoPick = !blockId.isInstanceOf[StreamBlockId]
209    val decrypted = if (maybeEncrypted) wrapForEncryption(inputStream) else inputStream
210    getSerializer(classTag, autoPick)
211      .newInstance()
212      .deserializeStream(wrapForCompression(blockId, decrypted))
213      .asIterator.asInstanceOf[Iterator[T]]
214  }
215}
216