1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.streaming.kinesis
19
20import scala.collection.mutable
21import scala.concurrent.duration._
22import scala.language.postfixOps
23import scala.util.Random
24
25import com.amazonaws.regions.RegionUtils
26import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
27import com.amazonaws.services.kinesis.model.Record
28import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
29import org.scalatest.Matchers._
30import org.scalatest.concurrent.Eventually
31
32import org.apache.spark.{SparkConf, SparkContext}
33import org.apache.spark.network.util.JavaUtils
34import org.apache.spark.rdd.RDD
35import org.apache.spark.storage.{StorageLevel, StreamBlockId}
36import org.apache.spark.streaming._
37import org.apache.spark.streaming.dstream.ReceiverInputDStream
38import org.apache.spark.streaming.kinesis.KinesisTestUtils._
39import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult
40import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
41import org.apache.spark.util.Utils
42
43abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFunSuite
44  with Eventually with BeforeAndAfter with BeforeAndAfterAll {
45
46  // This is the name that KCL will use to save metadata to DynamoDB
47  private val appName = s"KinesisStreamSuite-${math.abs(Random.nextLong())}"
48  private val batchDuration = Seconds(1)
49
50  // Dummy parameters for API testing
51  private val dummyEndpointUrl = defaultEndpointUrl
52  private val dummyRegionName = RegionUtils.getRegionByEndpoint(dummyEndpointUrl).getName()
53  private val dummyAWSAccessKey = "dummyAccessKey"
54  private val dummyAWSSecretKey = "dummySecretKey"
55
56  private var testUtils: KinesisTestUtils = null
57  private var ssc: StreamingContext = null
58  private var sc: SparkContext = null
59
60  override def beforeAll(): Unit = {
61    val conf = new SparkConf()
62      .setMaster("local[4]")
63      .setAppName("KinesisStreamSuite") // Setting Spark app name to Kinesis app name
64    sc = new SparkContext(conf)
65
66    runIfTestsEnabled("Prepare KinesisTestUtils") {
67      testUtils = new KPLBasedKinesisTestUtils()
68      testUtils.createStream()
69    }
70  }
71
72  override def afterAll(): Unit = {
73    if (ssc != null) {
74      ssc.stop()
75    }
76    if (sc != null) {
77      sc.stop()
78    }
79    if (testUtils != null) {
80      // Delete the Kinesis stream as well as the DynamoDB table generated by
81      // Kinesis Client Library when consuming the stream
82      testUtils.deleteStream()
83      testUtils.deleteDynamoDBTable(appName)
84    }
85  }
86
87  before {
88    ssc = new StreamingContext(sc, batchDuration)
89  }
90
91  after {
92    if (ssc != null) {
93      ssc.stop(stopSparkContext = false)
94      ssc = null
95    }
96    if (testUtils != null) {
97      testUtils.deleteDynamoDBTable(appName)
98    }
99  }
100
101  test("KinesisUtils API") {
102    val kinesisStream1 = KinesisUtils.createStream(ssc, "myAppName", "mySparkStream",
103      dummyEndpointUrl, dummyRegionName,
104      InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2)
105    val kinesisStream2 = KinesisUtils.createStream(ssc, "myAppName", "mySparkStream",
106      dummyEndpointUrl, dummyRegionName,
107      InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2,
108      dummyAWSAccessKey, dummyAWSSecretKey)
109  }
110
111  test("RDD generation") {
112    val inputStream = KinesisUtils.createStream(ssc, appName, "dummyStream",
113      dummyEndpointUrl, dummyRegionName, InitialPositionInStream.LATEST, Seconds(2),
114      StorageLevel.MEMORY_AND_DISK_2, dummyAWSAccessKey, dummyAWSSecretKey)
115    assert(inputStream.isInstanceOf[KinesisInputDStream[Array[Byte]]])
116
117    val kinesisStream = inputStream.asInstanceOf[KinesisInputDStream[Array[Byte]]]
118    val time = Time(1000)
119
120    // Generate block info data for testing
121    val seqNumRanges1 = SequenceNumberRanges(
122      SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy"))
123    val blockId1 = StreamBlockId(kinesisStream.id, 123)
124    val blockInfo1 = ReceivedBlockInfo(
125      0, None, Some(seqNumRanges1), new BlockManagerBasedStoreResult(blockId1, None))
126
127    val seqNumRanges2 = SequenceNumberRanges(
128      SequenceNumberRange("fakeStream", "fakeShardId", "aaa", "bbb"))
129    val blockId2 = StreamBlockId(kinesisStream.id, 345)
130    val blockInfo2 = ReceivedBlockInfo(
131      0, None, Some(seqNumRanges2), new BlockManagerBasedStoreResult(blockId2, None))
132
133    // Verify that the generated KinesisBackedBlockRDD has the all the right information
134    val blockInfos = Seq(blockInfo1, blockInfo2)
135    val nonEmptyRDD = kinesisStream.createBlockRDD(time, blockInfos)
136    nonEmptyRDD shouldBe a [KinesisBackedBlockRDD[_]]
137    val kinesisRDD = nonEmptyRDD.asInstanceOf[KinesisBackedBlockRDD[_]]
138    assert(kinesisRDD.regionName === dummyRegionName)
139    assert(kinesisRDD.endpointUrl === dummyEndpointUrl)
140    assert(kinesisRDD.retryTimeoutMs === batchDuration.milliseconds)
141    assert(kinesisRDD.awsCredentialsOption ===
142      Some(SerializableAWSCredentials(dummyAWSAccessKey, dummyAWSSecretKey)))
143    assert(nonEmptyRDD.partitions.size === blockInfos.size)
144    nonEmptyRDD.partitions.foreach { _ shouldBe a [KinesisBackedBlockRDDPartition] }
145    val partitions = nonEmptyRDD.partitions.map {
146      _.asInstanceOf[KinesisBackedBlockRDDPartition] }.toSeq
147    assert(partitions.map { _.seqNumberRanges } === Seq(seqNumRanges1, seqNumRanges2))
148    assert(partitions.map { _.blockId } === Seq(blockId1, blockId2))
149    assert(partitions.forall { _.isBlockIdValid === true })
150
151    // Verify that KinesisBackedBlockRDD is generated even when there are no blocks
152    val emptyRDD = kinesisStream.createBlockRDD(time, Seq.empty)
153    // Verify it's KinesisBackedBlockRDD[_] rather than KinesisBackedBlockRDD[Array[Byte]], because
154    // the type parameter will be erased at runtime
155    emptyRDD shouldBe a [KinesisBackedBlockRDD[_]]
156    emptyRDD.partitions shouldBe empty
157
158    // Verify that the KinesisBackedBlockRDD has isBlockValid = false when blocks are invalid
159    blockInfos.foreach { _.setBlockIdInvalid() }
160    kinesisStream.createBlockRDD(time, blockInfos).partitions.foreach { partition =>
161      assert(partition.asInstanceOf[KinesisBackedBlockRDDPartition].isBlockIdValid === false)
162    }
163  }
164
165
166  /**
167   * Test the stream by sending data to a Kinesis stream and receiving from it.
168   * This test is not run by default as it requires AWS credentials that the test
169   * environment may not have. Even if there is AWS credentials available, the user
170   * may not want to run these tests to avoid the Kinesis costs. To enable this test,
171   * you must have AWS credentials available through the default AWS provider chain,
172   * and you have to set the system environment variable RUN_KINESIS_TESTS=1 .
173   */
174  testIfEnabled("basic operation") {
175    val awsCredentials = KinesisTestUtils.getAWSCredentials()
176    val stream = KinesisUtils.createStream(ssc, appName, testUtils.streamName,
177      testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST,
178      Seconds(10), StorageLevel.MEMORY_ONLY,
179      awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)
180
181    val collected = new mutable.HashSet[Int]
182    stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd =>
183      collected.synchronized {
184        collected ++= rdd.collect()
185        logInfo("Collected = " + collected.mkString(", "))
186      }
187    }
188    ssc.start()
189
190    val testData = 1 to 10
191    eventually(timeout(120 seconds), interval(10 second)) {
192      testUtils.pushData(testData, aggregateTestData)
193      assert(collected.synchronized { collected === testData.toSet },
194        "\nData received does not match data sent")
195    }
196    ssc.stop(stopSparkContext = false)
197  }
198
199  testIfEnabled("custom message handling") {
200    val awsCredentials = KinesisTestUtils.getAWSCredentials()
201    def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5
202    val stream = KinesisUtils.createStream(ssc, appName, testUtils.streamName,
203      testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST,
204      Seconds(10), StorageLevel.MEMORY_ONLY, addFive,
205      awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)
206
207    stream shouldBe a [ReceiverInputDStream[_]]
208
209    val collected = new mutable.HashSet[Int]
210    stream.foreachRDD { rdd =>
211      collected.synchronized {
212        collected ++= rdd.collect()
213        logInfo("Collected = " + collected.mkString(", "))
214      }
215    }
216    ssc.start()
217
218    val testData = 1 to 10
219    eventually(timeout(120 seconds), interval(10 second)) {
220      testUtils.pushData(testData, aggregateTestData)
221      val modData = testData.map(_ + 5)
222      assert(collected.synchronized { collected === modData.toSet },
223        "\nData received does not match data sent")
224    }
225    ssc.stop(stopSparkContext = false)
226  }
227
228  testIfEnabled("failure recovery") {
229    val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
230    val checkpointDir = Utils.createTempDir().getAbsolutePath
231
232    ssc = new StreamingContext(sc, Milliseconds(1000))
233    ssc.checkpoint(checkpointDir)
234
235    val awsCredentials = KinesisTestUtils.getAWSCredentials()
236    val collectedData = new mutable.HashMap[Time, (Array[SequenceNumberRanges], Seq[Int])]
237
238    val kinesisStream = KinesisUtils.createStream(ssc, appName, testUtils.streamName,
239      testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST,
240      Seconds(10), StorageLevel.MEMORY_ONLY,
241      awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)
242
243    // Verify that the generated RDDs are KinesisBackedBlockRDDs, and collect the data in each batch
244    kinesisStream.foreachRDD((rdd: RDD[Array[Byte]], time: Time) => {
245      val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD[Array[Byte]]]
246      val data = rdd.map { bytes => new String(bytes).toInt }.collect().toSeq
247      collectedData.synchronized {
248        collectedData(time) = (kRdd.arrayOfseqNumberRanges, data)
249      }
250    })
251
252    ssc.remember(Minutes(60)) // remember all the batches so that they are all saved in checkpoint
253    ssc.start()
254
255    def numBatchesWithData: Int =
256      collectedData.synchronized { collectedData.count(_._2._2.nonEmpty) }
257
258    def isCheckpointPresent: Boolean = Checkpoint.getCheckpointFiles(checkpointDir).nonEmpty
259
260    // Run until there are at least 10 batches with some data in them
261    // If this times out because numBatchesWithData is empty, then its likely that foreachRDD
262    // function failed with exceptions, and nothing got added to `collectedData`
263    eventually(timeout(2 minutes), interval(1 seconds)) {
264      testUtils.pushData(1 to 5, aggregateTestData)
265      assert(isCheckpointPresent && numBatchesWithData > 10)
266    }
267    ssc.stop(stopSparkContext = true)  // stop the SparkContext so that the blocks are not reused
268
269    // Restart the context from checkpoint and verify whether the
270    logInfo("Restarting from checkpoint")
271    ssc = new StreamingContext(checkpointDir)
272    ssc.start()
273    val recoveredKinesisStream = ssc.graph.getInputStreams().head
274
275    // Verify that the recomputed RDDs are KinesisBackedBlockRDDs with the same sequence ranges
276    // and return the same data
277    collectedData.synchronized {
278      val times = collectedData.keySet
279      times.foreach { time =>
280        val (arrayOfSeqNumRanges, data) = collectedData(time)
281        val rdd = recoveredKinesisStream.getOrCompute(time).get.asInstanceOf[RDD[Array[Byte]]]
282        rdd shouldBe a[KinesisBackedBlockRDD[_]]
283
284        // Verify the recovered sequence ranges
285        val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD[Array[Byte]]]
286        assert(kRdd.arrayOfseqNumberRanges.size === arrayOfSeqNumRanges.size)
287        arrayOfSeqNumRanges.zip(kRdd.arrayOfseqNumberRanges).foreach { case (expected, found) =>
288          assert(expected.ranges.toSeq === found.ranges.toSeq)
289        }
290
291        // Verify the recovered data
292        assert(rdd.map { bytes => new String(bytes).toInt }.collect().toSeq === data)
293      }
294    }
295    ssc.stop()
296  }
297}
298
299class WithAggregationKinesisStreamSuite extends KinesisStreamTests(aggregateTestData = true)
300
301class WithoutAggregationKinesisStreamSuite extends KinesisStreamTests(aggregateTestData = false)
302