1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.streaming.kafka
19
20import java.util.Properties
21import java.util.concurrent.{ConcurrentHashMap, ThreadPoolExecutor}
22
23import scala.collection.{mutable, Map}
24import scala.reflect.{classTag, ClassTag}
25
26import kafka.common.TopicAndPartition
27import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, KafkaStream}
28import kafka.message.MessageAndMetadata
29import kafka.serializer.Decoder
30import kafka.utils.{VerifiableProperties, ZKGroupTopicDirs, ZKStringSerializer, ZkUtils}
31import org.I0Itec.zkclient.ZkClient
32
33import org.apache.spark.SparkEnv
34import org.apache.spark.internal.Logging
35import org.apache.spark.storage.{StorageLevel, StreamBlockId}
36import org.apache.spark.streaming.receiver.{BlockGenerator, BlockGeneratorListener, Receiver}
37import org.apache.spark.util.ThreadUtils
38
39/**
40 * ReliableKafkaReceiver offers the ability to reliably store data into BlockManager without loss.
41 * It is turned off by default and will be enabled when
42 * spark.streaming.receiver.writeAheadLog.enable is true. The difference compared to KafkaReceiver
43 * is that this receiver manages topic-partition/offset itself and updates the offset information
44 * after data is reliably stored as write-ahead log. Offsets will only be updated when data is
45 * reliably stored, so the potential data loss problem of KafkaReceiver can be eliminated.
46 *
47 * Note: ReliableKafkaReceiver will set auto.commit.enable to false to turn off automatic offset
48 * commit mechanism in Kafka consumer. So setting this configuration manually within kafkaParams
49 * will not take effect.
50 */
51private[streaming]
52class ReliableKafkaReceiver[
53  K: ClassTag,
54  V: ClassTag,
55  U <: Decoder[_]: ClassTag,
56  T <: Decoder[_]: ClassTag](
57    kafkaParams: Map[String, String],
58    topics: Map[String, Int],
59    storageLevel: StorageLevel)
60    extends Receiver[(K, V)](storageLevel) with Logging {
61
62  private val groupId = kafkaParams("group.id")
63  private val AUTO_OFFSET_COMMIT = "auto.commit.enable"
64  private def conf = SparkEnv.get.conf
65
66  /** High level consumer to connect to Kafka. */
67  private var consumerConnector: ConsumerConnector = null
68
69  /** zkClient to connect to Zookeeper to commit the offsets. */
70  private var zkClient: ZkClient = null
71
72  /**
73   * A HashMap to manage the offset for each topic/partition, this HashMap is called in
74   * synchronized block, so mutable HashMap will not meet concurrency issue.
75   */
76  private var topicPartitionOffsetMap: mutable.HashMap[TopicAndPartition, Long] = null
77
78  /** A concurrent HashMap to store the stream block id and related offset snapshot. */
79  private var blockOffsetMap: ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]] = null
80
81  /**
82   * Manage the BlockGenerator in receiver itself for better managing block store and offset
83   * commit.
84   */
85  private var blockGenerator: BlockGenerator = null
86
87  /** Thread pool running the handlers for receiving message from multiple topics and partitions. */
88  private var messageHandlerThreadPool: ThreadPoolExecutor = null
89
90  override def onStart(): Unit = {
91    logInfo(s"Starting Kafka Consumer Stream with group: $groupId")
92
93    // Initialize the topic-partition / offset hash map.
94    topicPartitionOffsetMap = new mutable.HashMap[TopicAndPartition, Long]
95
96    // Initialize the stream block id / offset snapshot hash map.
97    blockOffsetMap = new ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]]()
98
99    // Initialize the block generator for storing Kafka message.
100    blockGenerator = supervisor.createBlockGenerator(new GeneratedBlockHandler)
101
102    if (kafkaParams.contains(AUTO_OFFSET_COMMIT) && kafkaParams(AUTO_OFFSET_COMMIT) == "true") {
103      logWarning(s"$AUTO_OFFSET_COMMIT should be set to false in ReliableKafkaReceiver, " +
104        "otherwise we will manually set it to false to turn off auto offset commit in Kafka")
105    }
106
107    val props = new Properties()
108    kafkaParams.foreach(param => props.put(param._1, param._2))
109    // Manually set "auto.commit.enable" to "false" no matter user explicitly set it to true,
110    // we have to make sure this property is set to false to turn off auto commit mechanism in
111    // Kafka.
112    props.setProperty(AUTO_OFFSET_COMMIT, "false")
113
114    val consumerConfig = new ConsumerConfig(props)
115
116    assert(!consumerConfig.autoCommitEnable)
117
118    logInfo(s"Connecting to Zookeeper: ${consumerConfig.zkConnect}")
119    consumerConnector = Consumer.create(consumerConfig)
120    logInfo(s"Connected to Zookeeper: ${consumerConfig.zkConnect}")
121
122    zkClient = new ZkClient(consumerConfig.zkConnect, consumerConfig.zkSessionTimeoutMs,
123      consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer)
124
125    messageHandlerThreadPool = ThreadUtils.newDaemonFixedThreadPool(
126      topics.values.sum, "KafkaMessageHandler")
127
128    blockGenerator.start()
129
130    val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
131      .newInstance(consumerConfig.props)
132      .asInstanceOf[Decoder[K]]
133
134    val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
135      .newInstance(consumerConfig.props)
136      .asInstanceOf[Decoder[V]]
137
138    val topicMessageStreams = consumerConnector.createMessageStreams(
139      topics, keyDecoder, valueDecoder)
140
141    topicMessageStreams.values.foreach { streams =>
142      streams.foreach { stream =>
143        messageHandlerThreadPool.submit(new MessageHandler(stream))
144      }
145    }
146  }
147
148  override def onStop(): Unit = {
149    if (messageHandlerThreadPool != null) {
150      messageHandlerThreadPool.shutdown()
151      messageHandlerThreadPool = null
152    }
153
154    if (consumerConnector != null) {
155      consumerConnector.shutdown()
156      consumerConnector = null
157    }
158
159    if (zkClient != null) {
160      zkClient.close()
161      zkClient = null
162    }
163
164    if (blockGenerator != null) {
165      blockGenerator.stop()
166      blockGenerator = null
167    }
168
169    if (topicPartitionOffsetMap != null) {
170      topicPartitionOffsetMap.clear()
171      topicPartitionOffsetMap = null
172    }
173
174    if (blockOffsetMap != null) {
175      blockOffsetMap.clear()
176      blockOffsetMap = null
177    }
178  }
179
180  /** Store a Kafka message and the associated metadata as a tuple. */
181  private def storeMessageAndMetadata(
182      msgAndMetadata: MessageAndMetadata[K, V]): Unit = {
183    val topicAndPartition = TopicAndPartition(msgAndMetadata.topic, msgAndMetadata.partition)
184    val data = (msgAndMetadata.key, msgAndMetadata.message)
185    val metadata = (topicAndPartition, msgAndMetadata.offset)
186    blockGenerator.addDataWithCallback(data, metadata)
187  }
188
189  /** Update stored offset */
190  private def updateOffset(topicAndPartition: TopicAndPartition, offset: Long): Unit = {
191    topicPartitionOffsetMap.put(topicAndPartition, offset)
192  }
193
194  /**
195   * Remember the current offsets for each topic and partition. This is called when a block is
196   * generated.
197   */
198  private def rememberBlockOffsets(blockId: StreamBlockId): Unit = {
199    // Get a snapshot of current offset map and store with related block id.
200    val offsetSnapshot = topicPartitionOffsetMap.toMap
201    blockOffsetMap.put(blockId, offsetSnapshot)
202    topicPartitionOffsetMap.clear()
203  }
204
205  /**
206   * Store the ready-to-be-stored block and commit the related offsets to zookeeper. This method
207   * will try a fixed number of times to push the block. If the push fails, the receiver is stopped.
208   */
209  private def storeBlockAndCommitOffset(
210      blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
211    var count = 0
212    var pushed = false
213    var exception: Exception = null
214    while (!pushed && count <= 3) {
215      try {
216        store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[(K, V)]])
217        pushed = true
218      } catch {
219        case ex: Exception =>
220          count += 1
221          exception = ex
222      }
223    }
224    if (pushed) {
225      Option(blockOffsetMap.get(blockId)).foreach(commitOffset)
226      blockOffsetMap.remove(blockId)
227    } else {
228      stop("Error while storing block into Spark", exception)
229    }
230  }
231
232  /**
233   * Commit the offset of Kafka's topic/partition, the commit mechanism follow Kafka 0.8.x's
234   * metadata schema in Zookeeper.
235   */
236  private def commitOffset(offsetMap: Map[TopicAndPartition, Long]): Unit = {
237    if (zkClient == null) {
238      val thrown = new IllegalStateException("Zookeeper client is unexpectedly null")
239      stop("Zookeeper client is not initialized before commit offsets to ZK", thrown)
240      return
241    }
242
243    for ((topicAndPart, offset) <- offsetMap) {
244      try {
245        val topicDirs = new ZKGroupTopicDirs(groupId, topicAndPart.topic)
246        val zkPath = s"${topicDirs.consumerOffsetDir}/${topicAndPart.partition}"
247
248        ZkUtils.updatePersistentPath(zkClient, zkPath, offset.toString)
249      } catch {
250        case e: Exception =>
251          logWarning(s"Exception during commit offset $offset for topic" +
252            s"${topicAndPart.topic}, partition ${topicAndPart.partition}", e)
253      }
254
255      logInfo(s"Committed offset $offset for topic ${topicAndPart.topic}, " +
256        s"partition ${topicAndPart.partition}")
257    }
258  }
259
260  /** Class to handle received Kafka message. */
261  private final class MessageHandler(stream: KafkaStream[K, V]) extends Runnable {
262    override def run(): Unit = {
263      while (!isStopped) {
264        try {
265          val streamIterator = stream.iterator()
266          while (streamIterator.hasNext) {
267            storeMessageAndMetadata(streamIterator.next)
268          }
269        } catch {
270          case e: Exception =>
271            reportError("Error handling message", e)
272        }
273      }
274    }
275  }
276
277  /** Class to handle blocks generated by the block generator. */
278  private final class GeneratedBlockHandler extends BlockGeneratorListener {
279
280    def onAddData(data: Any, metadata: Any): Unit = {
281      // Update the offset of the data that was added to the generator
282      if (metadata != null) {
283        val (topicAndPartition, offset) = metadata.asInstanceOf[(TopicAndPartition, Long)]
284        updateOffset(topicAndPartition, offset)
285      }
286    }
287
288    def onGenerateBlock(blockId: StreamBlockId): Unit = {
289      // Remember the offsets of topics/partitions when a block has been generated
290      rememberBlockOffsets(blockId)
291    }
292
293    def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
294      // Store block and commit the blocks offset
295      storeBlockAndCommitOffset(blockId, arrayBuffer)
296    }
297
298    def onError(message: String, throwable: Throwable): Unit = {
299      reportError(message, throwable)
300    }
301  }
302}
303