1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.streaming.kafka 19 20import java.util.Properties 21import java.util.concurrent.{ConcurrentHashMap, ThreadPoolExecutor} 22 23import scala.collection.{mutable, Map} 24import scala.reflect.{classTag, ClassTag} 25 26import kafka.common.TopicAndPartition 27import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, KafkaStream} 28import kafka.message.MessageAndMetadata 29import kafka.serializer.Decoder 30import kafka.utils.{VerifiableProperties, ZKGroupTopicDirs, ZKStringSerializer, ZkUtils} 31import org.I0Itec.zkclient.ZkClient 32 33import org.apache.spark.SparkEnv 34import org.apache.spark.internal.Logging 35import org.apache.spark.storage.{StorageLevel, StreamBlockId} 36import org.apache.spark.streaming.receiver.{BlockGenerator, BlockGeneratorListener, Receiver} 37import org.apache.spark.util.ThreadUtils 38 39/** 40 * ReliableKafkaReceiver offers the ability to reliably store data into BlockManager without loss. 41 * It is turned off by default and will be enabled when 42 * spark.streaming.receiver.writeAheadLog.enable is true. The difference compared to KafkaReceiver 43 * is that this receiver manages topic-partition/offset itself and updates the offset information 44 * after data is reliably stored as write-ahead log. Offsets will only be updated when data is 45 * reliably stored, so the potential data loss problem of KafkaReceiver can be eliminated. 46 * 47 * Note: ReliableKafkaReceiver will set auto.commit.enable to false to turn off automatic offset 48 * commit mechanism in Kafka consumer. So setting this configuration manually within kafkaParams 49 * will not take effect. 50 */ 51private[streaming] 52class ReliableKafkaReceiver[ 53 K: ClassTag, 54 V: ClassTag, 55 U <: Decoder[_]: ClassTag, 56 T <: Decoder[_]: ClassTag]( 57 kafkaParams: Map[String, String], 58 topics: Map[String, Int], 59 storageLevel: StorageLevel) 60 extends Receiver[(K, V)](storageLevel) with Logging { 61 62 private val groupId = kafkaParams("group.id") 63 private val AUTO_OFFSET_COMMIT = "auto.commit.enable" 64 private def conf = SparkEnv.get.conf 65 66 /** High level consumer to connect to Kafka. */ 67 private var consumerConnector: ConsumerConnector = null 68 69 /** zkClient to connect to Zookeeper to commit the offsets. */ 70 private var zkClient: ZkClient = null 71 72 /** 73 * A HashMap to manage the offset for each topic/partition, this HashMap is called in 74 * synchronized block, so mutable HashMap will not meet concurrency issue. 75 */ 76 private var topicPartitionOffsetMap: mutable.HashMap[TopicAndPartition, Long] = null 77 78 /** A concurrent HashMap to store the stream block id and related offset snapshot. */ 79 private var blockOffsetMap: ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]] = null 80 81 /** 82 * Manage the BlockGenerator in receiver itself for better managing block store and offset 83 * commit. 84 */ 85 private var blockGenerator: BlockGenerator = null 86 87 /** Thread pool running the handlers for receiving message from multiple topics and partitions. */ 88 private var messageHandlerThreadPool: ThreadPoolExecutor = null 89 90 override def onStart(): Unit = { 91 logInfo(s"Starting Kafka Consumer Stream with group: $groupId") 92 93 // Initialize the topic-partition / offset hash map. 94 topicPartitionOffsetMap = new mutable.HashMap[TopicAndPartition, Long] 95 96 // Initialize the stream block id / offset snapshot hash map. 97 blockOffsetMap = new ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]]() 98 99 // Initialize the block generator for storing Kafka message. 100 blockGenerator = supervisor.createBlockGenerator(new GeneratedBlockHandler) 101 102 if (kafkaParams.contains(AUTO_OFFSET_COMMIT) && kafkaParams(AUTO_OFFSET_COMMIT) == "true") { 103 logWarning(s"$AUTO_OFFSET_COMMIT should be set to false in ReliableKafkaReceiver, " + 104 "otherwise we will manually set it to false to turn off auto offset commit in Kafka") 105 } 106 107 val props = new Properties() 108 kafkaParams.foreach(param => props.put(param._1, param._2)) 109 // Manually set "auto.commit.enable" to "false" no matter user explicitly set it to true, 110 // we have to make sure this property is set to false to turn off auto commit mechanism in 111 // Kafka. 112 props.setProperty(AUTO_OFFSET_COMMIT, "false") 113 114 val consumerConfig = new ConsumerConfig(props) 115 116 assert(!consumerConfig.autoCommitEnable) 117 118 logInfo(s"Connecting to Zookeeper: ${consumerConfig.zkConnect}") 119 consumerConnector = Consumer.create(consumerConfig) 120 logInfo(s"Connected to Zookeeper: ${consumerConfig.zkConnect}") 121 122 zkClient = new ZkClient(consumerConfig.zkConnect, consumerConfig.zkSessionTimeoutMs, 123 consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer) 124 125 messageHandlerThreadPool = ThreadUtils.newDaemonFixedThreadPool( 126 topics.values.sum, "KafkaMessageHandler") 127 128 blockGenerator.start() 129 130 val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties]) 131 .newInstance(consumerConfig.props) 132 .asInstanceOf[Decoder[K]] 133 134 val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties]) 135 .newInstance(consumerConfig.props) 136 .asInstanceOf[Decoder[V]] 137 138 val topicMessageStreams = consumerConnector.createMessageStreams( 139 topics, keyDecoder, valueDecoder) 140 141 topicMessageStreams.values.foreach { streams => 142 streams.foreach { stream => 143 messageHandlerThreadPool.submit(new MessageHandler(stream)) 144 } 145 } 146 } 147 148 override def onStop(): Unit = { 149 if (messageHandlerThreadPool != null) { 150 messageHandlerThreadPool.shutdown() 151 messageHandlerThreadPool = null 152 } 153 154 if (consumerConnector != null) { 155 consumerConnector.shutdown() 156 consumerConnector = null 157 } 158 159 if (zkClient != null) { 160 zkClient.close() 161 zkClient = null 162 } 163 164 if (blockGenerator != null) { 165 blockGenerator.stop() 166 blockGenerator = null 167 } 168 169 if (topicPartitionOffsetMap != null) { 170 topicPartitionOffsetMap.clear() 171 topicPartitionOffsetMap = null 172 } 173 174 if (blockOffsetMap != null) { 175 blockOffsetMap.clear() 176 blockOffsetMap = null 177 } 178 } 179 180 /** Store a Kafka message and the associated metadata as a tuple. */ 181 private def storeMessageAndMetadata( 182 msgAndMetadata: MessageAndMetadata[K, V]): Unit = { 183 val topicAndPartition = TopicAndPartition(msgAndMetadata.topic, msgAndMetadata.partition) 184 val data = (msgAndMetadata.key, msgAndMetadata.message) 185 val metadata = (topicAndPartition, msgAndMetadata.offset) 186 blockGenerator.addDataWithCallback(data, metadata) 187 } 188 189 /** Update stored offset */ 190 private def updateOffset(topicAndPartition: TopicAndPartition, offset: Long): Unit = { 191 topicPartitionOffsetMap.put(topicAndPartition, offset) 192 } 193 194 /** 195 * Remember the current offsets for each topic and partition. This is called when a block is 196 * generated. 197 */ 198 private def rememberBlockOffsets(blockId: StreamBlockId): Unit = { 199 // Get a snapshot of current offset map and store with related block id. 200 val offsetSnapshot = topicPartitionOffsetMap.toMap 201 blockOffsetMap.put(blockId, offsetSnapshot) 202 topicPartitionOffsetMap.clear() 203 } 204 205 /** 206 * Store the ready-to-be-stored block and commit the related offsets to zookeeper. This method 207 * will try a fixed number of times to push the block. If the push fails, the receiver is stopped. 208 */ 209 private def storeBlockAndCommitOffset( 210 blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = { 211 var count = 0 212 var pushed = false 213 var exception: Exception = null 214 while (!pushed && count <= 3) { 215 try { 216 store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[(K, V)]]) 217 pushed = true 218 } catch { 219 case ex: Exception => 220 count += 1 221 exception = ex 222 } 223 } 224 if (pushed) { 225 Option(blockOffsetMap.get(blockId)).foreach(commitOffset) 226 blockOffsetMap.remove(blockId) 227 } else { 228 stop("Error while storing block into Spark", exception) 229 } 230 } 231 232 /** 233 * Commit the offset of Kafka's topic/partition, the commit mechanism follow Kafka 0.8.x's 234 * metadata schema in Zookeeper. 235 */ 236 private def commitOffset(offsetMap: Map[TopicAndPartition, Long]): Unit = { 237 if (zkClient == null) { 238 val thrown = new IllegalStateException("Zookeeper client is unexpectedly null") 239 stop("Zookeeper client is not initialized before commit offsets to ZK", thrown) 240 return 241 } 242 243 for ((topicAndPart, offset) <- offsetMap) { 244 try { 245 val topicDirs = new ZKGroupTopicDirs(groupId, topicAndPart.topic) 246 val zkPath = s"${topicDirs.consumerOffsetDir}/${topicAndPart.partition}" 247 248 ZkUtils.updatePersistentPath(zkClient, zkPath, offset.toString) 249 } catch { 250 case e: Exception => 251 logWarning(s"Exception during commit offset $offset for topic" + 252 s"${topicAndPart.topic}, partition ${topicAndPart.partition}", e) 253 } 254 255 logInfo(s"Committed offset $offset for topic ${topicAndPart.topic}, " + 256 s"partition ${topicAndPart.partition}") 257 } 258 } 259 260 /** Class to handle received Kafka message. */ 261 private final class MessageHandler(stream: KafkaStream[K, V]) extends Runnable { 262 override def run(): Unit = { 263 while (!isStopped) { 264 try { 265 val streamIterator = stream.iterator() 266 while (streamIterator.hasNext) { 267 storeMessageAndMetadata(streamIterator.next) 268 } 269 } catch { 270 case e: Exception => 271 reportError("Error handling message", e) 272 } 273 } 274 } 275 } 276 277 /** Class to handle blocks generated by the block generator. */ 278 private final class GeneratedBlockHandler extends BlockGeneratorListener { 279 280 def onAddData(data: Any, metadata: Any): Unit = { 281 // Update the offset of the data that was added to the generator 282 if (metadata != null) { 283 val (topicAndPartition, offset) = metadata.asInstanceOf[(TopicAndPartition, Long)] 284 updateOffset(topicAndPartition, offset) 285 } 286 } 287 288 def onGenerateBlock(blockId: StreamBlockId): Unit = { 289 // Remember the offsets of topics/partitions when a block has been generated 290 rememberBlockOffsets(blockId) 291 } 292 293 def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = { 294 // Store block and commit the blocks offset 295 storeBlockAndCommitOffset(blockId, arrayBuffer) 296 } 297 298 def onError(message: String, throwable: Throwable): Unit = { 299 reportError(message, throwable) 300 } 301 } 302} 303