1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.streaming.kinesis 19 20import scala.collection.mutable 21import scala.concurrent.duration._ 22import scala.language.postfixOps 23import scala.util.Random 24 25import com.amazonaws.regions.RegionUtils 26import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream 27import com.amazonaws.services.kinesis.model.Record 28import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} 29import org.scalatest.Matchers._ 30import org.scalatest.concurrent.Eventually 31 32import org.apache.spark.{SparkConf, SparkContext} 33import org.apache.spark.network.util.JavaUtils 34import org.apache.spark.rdd.RDD 35import org.apache.spark.storage.{StorageLevel, StreamBlockId} 36import org.apache.spark.streaming._ 37import org.apache.spark.streaming.dstream.ReceiverInputDStream 38import org.apache.spark.streaming.kinesis.KinesisTestUtils._ 39import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult 40import org.apache.spark.streaming.scheduler.ReceivedBlockInfo 41import org.apache.spark.util.Utils 42 43abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFunSuite 44 with Eventually with BeforeAndAfter with BeforeAndAfterAll { 45 46 // This is the name that KCL will use to save metadata to DynamoDB 47 private val appName = s"KinesisStreamSuite-${math.abs(Random.nextLong())}" 48 private val batchDuration = Seconds(1) 49 50 // Dummy parameters for API testing 51 private val dummyEndpointUrl = defaultEndpointUrl 52 private val dummyRegionName = RegionUtils.getRegionByEndpoint(dummyEndpointUrl).getName() 53 private val dummyAWSAccessKey = "dummyAccessKey" 54 private val dummyAWSSecretKey = "dummySecretKey" 55 56 private var testUtils: KinesisTestUtils = null 57 private var ssc: StreamingContext = null 58 private var sc: SparkContext = null 59 60 override def beforeAll(): Unit = { 61 val conf = new SparkConf() 62 .setMaster("local[4]") 63 .setAppName("KinesisStreamSuite") // Setting Spark app name to Kinesis app name 64 sc = new SparkContext(conf) 65 66 runIfTestsEnabled("Prepare KinesisTestUtils") { 67 testUtils = new KPLBasedKinesisTestUtils() 68 testUtils.createStream() 69 } 70 } 71 72 override def afterAll(): Unit = { 73 if (ssc != null) { 74 ssc.stop() 75 } 76 if (sc != null) { 77 sc.stop() 78 } 79 if (testUtils != null) { 80 // Delete the Kinesis stream as well as the DynamoDB table generated by 81 // Kinesis Client Library when consuming the stream 82 testUtils.deleteStream() 83 testUtils.deleteDynamoDBTable(appName) 84 } 85 } 86 87 before { 88 ssc = new StreamingContext(sc, batchDuration) 89 } 90 91 after { 92 if (ssc != null) { 93 ssc.stop(stopSparkContext = false) 94 ssc = null 95 } 96 if (testUtils != null) { 97 testUtils.deleteDynamoDBTable(appName) 98 } 99 } 100 101 test("KinesisUtils API") { 102 val kinesisStream1 = KinesisUtils.createStream(ssc, "myAppName", "mySparkStream", 103 dummyEndpointUrl, dummyRegionName, 104 InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2) 105 val kinesisStream2 = KinesisUtils.createStream(ssc, "myAppName", "mySparkStream", 106 dummyEndpointUrl, dummyRegionName, 107 InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2, 108 dummyAWSAccessKey, dummyAWSSecretKey) 109 } 110 111 test("RDD generation") { 112 val inputStream = KinesisUtils.createStream(ssc, appName, "dummyStream", 113 dummyEndpointUrl, dummyRegionName, InitialPositionInStream.LATEST, Seconds(2), 114 StorageLevel.MEMORY_AND_DISK_2, dummyAWSAccessKey, dummyAWSSecretKey) 115 assert(inputStream.isInstanceOf[KinesisInputDStream[Array[Byte]]]) 116 117 val kinesisStream = inputStream.asInstanceOf[KinesisInputDStream[Array[Byte]]] 118 val time = Time(1000) 119 120 // Generate block info data for testing 121 val seqNumRanges1 = SequenceNumberRanges( 122 SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy")) 123 val blockId1 = StreamBlockId(kinesisStream.id, 123) 124 val blockInfo1 = ReceivedBlockInfo( 125 0, None, Some(seqNumRanges1), new BlockManagerBasedStoreResult(blockId1, None)) 126 127 val seqNumRanges2 = SequenceNumberRanges( 128 SequenceNumberRange("fakeStream", "fakeShardId", "aaa", "bbb")) 129 val blockId2 = StreamBlockId(kinesisStream.id, 345) 130 val blockInfo2 = ReceivedBlockInfo( 131 0, None, Some(seqNumRanges2), new BlockManagerBasedStoreResult(blockId2, None)) 132 133 // Verify that the generated KinesisBackedBlockRDD has the all the right information 134 val blockInfos = Seq(blockInfo1, blockInfo2) 135 val nonEmptyRDD = kinesisStream.createBlockRDD(time, blockInfos) 136 nonEmptyRDD shouldBe a [KinesisBackedBlockRDD[_]] 137 val kinesisRDD = nonEmptyRDD.asInstanceOf[KinesisBackedBlockRDD[_]] 138 assert(kinesisRDD.regionName === dummyRegionName) 139 assert(kinesisRDD.endpointUrl === dummyEndpointUrl) 140 assert(kinesisRDD.retryTimeoutMs === batchDuration.milliseconds) 141 assert(kinesisRDD.awsCredentialsOption === 142 Some(SerializableAWSCredentials(dummyAWSAccessKey, dummyAWSSecretKey))) 143 assert(nonEmptyRDD.partitions.size === blockInfos.size) 144 nonEmptyRDD.partitions.foreach { _ shouldBe a [KinesisBackedBlockRDDPartition] } 145 val partitions = nonEmptyRDD.partitions.map { 146 _.asInstanceOf[KinesisBackedBlockRDDPartition] }.toSeq 147 assert(partitions.map { _.seqNumberRanges } === Seq(seqNumRanges1, seqNumRanges2)) 148 assert(partitions.map { _.blockId } === Seq(blockId1, blockId2)) 149 assert(partitions.forall { _.isBlockIdValid === true }) 150 151 // Verify that KinesisBackedBlockRDD is generated even when there are no blocks 152 val emptyRDD = kinesisStream.createBlockRDD(time, Seq.empty) 153 // Verify it's KinesisBackedBlockRDD[_] rather than KinesisBackedBlockRDD[Array[Byte]], because 154 // the type parameter will be erased at runtime 155 emptyRDD shouldBe a [KinesisBackedBlockRDD[_]] 156 emptyRDD.partitions shouldBe empty 157 158 // Verify that the KinesisBackedBlockRDD has isBlockValid = false when blocks are invalid 159 blockInfos.foreach { _.setBlockIdInvalid() } 160 kinesisStream.createBlockRDD(time, blockInfos).partitions.foreach { partition => 161 assert(partition.asInstanceOf[KinesisBackedBlockRDDPartition].isBlockIdValid === false) 162 } 163 } 164 165 166 /** 167 * Test the stream by sending data to a Kinesis stream and receiving from it. 168 * This test is not run by default as it requires AWS credentials that the test 169 * environment may not have. Even if there is AWS credentials available, the user 170 * may not want to run these tests to avoid the Kinesis costs. To enable this test, 171 * you must have AWS credentials available through the default AWS provider chain, 172 * and you have to set the system environment variable RUN_KINESIS_TESTS=1 . 173 */ 174 testIfEnabled("basic operation") { 175 val awsCredentials = KinesisTestUtils.getAWSCredentials() 176 val stream = KinesisUtils.createStream(ssc, appName, testUtils.streamName, 177 testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST, 178 Seconds(10), StorageLevel.MEMORY_ONLY, 179 awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey) 180 181 val collected = new mutable.HashSet[Int] 182 stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd => 183 collected.synchronized { 184 collected ++= rdd.collect() 185 logInfo("Collected = " + collected.mkString(", ")) 186 } 187 } 188 ssc.start() 189 190 val testData = 1 to 10 191 eventually(timeout(120 seconds), interval(10 second)) { 192 testUtils.pushData(testData, aggregateTestData) 193 assert(collected.synchronized { collected === testData.toSet }, 194 "\nData received does not match data sent") 195 } 196 ssc.stop(stopSparkContext = false) 197 } 198 199 testIfEnabled("custom message handling") { 200 val awsCredentials = KinesisTestUtils.getAWSCredentials() 201 def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5 202 val stream = KinesisUtils.createStream(ssc, appName, testUtils.streamName, 203 testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST, 204 Seconds(10), StorageLevel.MEMORY_ONLY, addFive, 205 awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey) 206 207 stream shouldBe a [ReceiverInputDStream[_]] 208 209 val collected = new mutable.HashSet[Int] 210 stream.foreachRDD { rdd => 211 collected.synchronized { 212 collected ++= rdd.collect() 213 logInfo("Collected = " + collected.mkString(", ")) 214 } 215 } 216 ssc.start() 217 218 val testData = 1 to 10 219 eventually(timeout(120 seconds), interval(10 second)) { 220 testUtils.pushData(testData, aggregateTestData) 221 val modData = testData.map(_ + 5) 222 assert(collected.synchronized { collected === modData.toSet }, 223 "\nData received does not match data sent") 224 } 225 ssc.stop(stopSparkContext = false) 226 } 227 228 testIfEnabled("failure recovery") { 229 val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName) 230 val checkpointDir = Utils.createTempDir().getAbsolutePath 231 232 ssc = new StreamingContext(sc, Milliseconds(1000)) 233 ssc.checkpoint(checkpointDir) 234 235 val awsCredentials = KinesisTestUtils.getAWSCredentials() 236 val collectedData = new mutable.HashMap[Time, (Array[SequenceNumberRanges], Seq[Int])] 237 238 val kinesisStream = KinesisUtils.createStream(ssc, appName, testUtils.streamName, 239 testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST, 240 Seconds(10), StorageLevel.MEMORY_ONLY, 241 awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey) 242 243 // Verify that the generated RDDs are KinesisBackedBlockRDDs, and collect the data in each batch 244 kinesisStream.foreachRDD((rdd: RDD[Array[Byte]], time: Time) => { 245 val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD[Array[Byte]]] 246 val data = rdd.map { bytes => new String(bytes).toInt }.collect().toSeq 247 collectedData.synchronized { 248 collectedData(time) = (kRdd.arrayOfseqNumberRanges, data) 249 } 250 }) 251 252 ssc.remember(Minutes(60)) // remember all the batches so that they are all saved in checkpoint 253 ssc.start() 254 255 def numBatchesWithData: Int = 256 collectedData.synchronized { collectedData.count(_._2._2.nonEmpty) } 257 258 def isCheckpointPresent: Boolean = Checkpoint.getCheckpointFiles(checkpointDir).nonEmpty 259 260 // Run until there are at least 10 batches with some data in them 261 // If this times out because numBatchesWithData is empty, then its likely that foreachRDD 262 // function failed with exceptions, and nothing got added to `collectedData` 263 eventually(timeout(2 minutes), interval(1 seconds)) { 264 testUtils.pushData(1 to 5, aggregateTestData) 265 assert(isCheckpointPresent && numBatchesWithData > 10) 266 } 267 ssc.stop(stopSparkContext = true) // stop the SparkContext so that the blocks are not reused 268 269 // Restart the context from checkpoint and verify whether the 270 logInfo("Restarting from checkpoint") 271 ssc = new StreamingContext(checkpointDir) 272 ssc.start() 273 val recoveredKinesisStream = ssc.graph.getInputStreams().head 274 275 // Verify that the recomputed RDDs are KinesisBackedBlockRDDs with the same sequence ranges 276 // and return the same data 277 collectedData.synchronized { 278 val times = collectedData.keySet 279 times.foreach { time => 280 val (arrayOfSeqNumRanges, data) = collectedData(time) 281 val rdd = recoveredKinesisStream.getOrCompute(time).get.asInstanceOf[RDD[Array[Byte]]] 282 rdd shouldBe a[KinesisBackedBlockRDD[_]] 283 284 // Verify the recovered sequence ranges 285 val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD[Array[Byte]]] 286 assert(kRdd.arrayOfseqNumberRanges.size === arrayOfSeqNumRanges.size) 287 arrayOfSeqNumRanges.zip(kRdd.arrayOfseqNumberRanges).foreach { case (expected, found) => 288 assert(expected.ranges.toSeq === found.ranges.toSeq) 289 } 290 291 // Verify the recovered data 292 assert(rdd.map { bytes => new String(bytes).toInt }.collect().toSeq === data) 293 } 294 } 295 ssc.stop() 296 } 297} 298 299class WithAggregationKinesisStreamSuite extends KinesisStreamTests(aggregateTestData = true) 300 301class WithoutAggregationKinesisStreamSuite extends KinesisStreamTests(aggregateTestData = false) 302