1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 package org.apache.spark.streaming.kafka; 19 20 import java.io.Serializable; 21 import java.util.HashMap; 22 import java.util.Map; 23 24 import scala.Tuple2; 25 26 import kafka.common.TopicAndPartition; 27 import kafka.message.MessageAndMetadata; 28 import kafka.serializer.StringDecoder; 29 import org.junit.After; 30 import org.junit.Assert; 31 import org.junit.Before; 32 import org.junit.Test; 33 34 import org.apache.spark.SparkConf; 35 import org.apache.spark.api.java.JavaRDD; 36 import org.apache.spark.api.java.JavaSparkContext; 37 import org.apache.spark.api.java.function.Function; 38 39 public class JavaKafkaRDDSuite implements Serializable { 40 private transient JavaSparkContext sc = null; 41 private transient KafkaTestUtils kafkaTestUtils = null; 42 43 @Before setUp()44 public void setUp() { 45 kafkaTestUtils = new KafkaTestUtils(); 46 kafkaTestUtils.setup(); 47 SparkConf sparkConf = new SparkConf() 48 .setMaster("local[4]").setAppName(this.getClass().getSimpleName()); 49 sc = new JavaSparkContext(sparkConf); 50 } 51 52 @After tearDown()53 public void tearDown() { 54 if (sc != null) { 55 sc.stop(); 56 sc = null; 57 } 58 59 if (kafkaTestUtils != null) { 60 kafkaTestUtils.teardown(); 61 kafkaTestUtils = null; 62 } 63 } 64 65 @Test testKafkaRDD()66 public void testKafkaRDD() throws InterruptedException { 67 String topic1 = "topic1"; 68 String topic2 = "topic2"; 69 70 createTopicAndSendData(topic1); 71 createTopicAndSendData(topic2); 72 73 Map<String, String> kafkaParams = new HashMap<>(); 74 kafkaParams.put("metadata.broker.list", kafkaTestUtils.brokerAddress()); 75 76 OffsetRange[] offsetRanges = { 77 OffsetRange.create(topic1, 0, 0, 1), 78 OffsetRange.create(topic2, 0, 0, 1) 79 }; 80 81 Map<TopicAndPartition, Broker> emptyLeaders = new HashMap<>(); 82 Map<TopicAndPartition, Broker> leaders = new HashMap<>(); 83 String[] hostAndPort = kafkaTestUtils.brokerAddress().split(":"); 84 Broker broker = Broker.create(hostAndPort[0], Integer.parseInt(hostAndPort[1])); 85 leaders.put(new TopicAndPartition(topic1, 0), broker); 86 leaders.put(new TopicAndPartition(topic2, 0), broker); 87 88 JavaRDD<String> rdd1 = KafkaUtils.createRDD( 89 sc, 90 String.class, 91 String.class, 92 StringDecoder.class, 93 StringDecoder.class, 94 kafkaParams, 95 offsetRanges 96 ).map( 97 new Function<Tuple2<String, String>, String>() { 98 @Override 99 public String call(Tuple2<String, String> kv) { 100 return kv._2(); 101 } 102 } 103 ); 104 105 JavaRDD<String> rdd2 = KafkaUtils.createRDD( 106 sc, 107 String.class, 108 String.class, 109 StringDecoder.class, 110 StringDecoder.class, 111 String.class, 112 kafkaParams, 113 offsetRanges, 114 emptyLeaders, 115 new Function<MessageAndMetadata<String, String>, String>() { 116 @Override 117 public String call(MessageAndMetadata<String, String> msgAndMd) { 118 return msgAndMd.message(); 119 } 120 } 121 ); 122 123 JavaRDD<String> rdd3 = KafkaUtils.createRDD( 124 sc, 125 String.class, 126 String.class, 127 StringDecoder.class, 128 StringDecoder.class, 129 String.class, 130 kafkaParams, 131 offsetRanges, 132 leaders, 133 new Function<MessageAndMetadata<String, String>, String>() { 134 @Override 135 public String call(MessageAndMetadata<String, String> msgAndMd) { 136 return msgAndMd.message(); 137 } 138 } 139 ); 140 141 // just making sure the java user apis work; the scala tests handle logic corner cases 142 long count1 = rdd1.count(); 143 long count2 = rdd2.count(); 144 long count3 = rdd3.count(); 145 Assert.assertTrue(count1 > 0); 146 Assert.assertEquals(count1, count2); 147 Assert.assertEquals(count1, count3); 148 } 149 createTopicAndSendData(String topic)150 private String[] createTopicAndSendData(String topic) { 151 String[] data = { topic + "-1", topic + "-2", topic + "-3"}; 152 kafkaTestUtils.createTopic(topic, 1); 153 kafkaTestUtils.sendMessages(topic, data); 154 return data; 155 } 156 } 157