1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements.  See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License.  You may obtain a copy of the License at
8  *
9  *    http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 package org.apache.spark.streaming.kafka;
19 
20 import java.io.Serializable;
21 import java.util.HashMap;
22 import java.util.Map;
23 
24 import scala.Tuple2;
25 
26 import kafka.common.TopicAndPartition;
27 import kafka.message.MessageAndMetadata;
28 import kafka.serializer.StringDecoder;
29 import org.junit.After;
30 import org.junit.Assert;
31 import org.junit.Before;
32 import org.junit.Test;
33 
34 import org.apache.spark.SparkConf;
35 import org.apache.spark.api.java.JavaRDD;
36 import org.apache.spark.api.java.JavaSparkContext;
37 import org.apache.spark.api.java.function.Function;
38 
39 public class JavaKafkaRDDSuite implements Serializable {
40   private transient JavaSparkContext sc = null;
41   private transient KafkaTestUtils kafkaTestUtils = null;
42 
43   @Before
setUp()44   public void setUp() {
45     kafkaTestUtils = new KafkaTestUtils();
46     kafkaTestUtils.setup();
47     SparkConf sparkConf = new SparkConf()
48       .setMaster("local[4]").setAppName(this.getClass().getSimpleName());
49     sc = new JavaSparkContext(sparkConf);
50   }
51 
52   @After
tearDown()53   public void tearDown() {
54     if (sc != null) {
55       sc.stop();
56       sc = null;
57     }
58 
59     if (kafkaTestUtils != null) {
60       kafkaTestUtils.teardown();
61       kafkaTestUtils = null;
62     }
63   }
64 
65   @Test
testKafkaRDD()66   public void testKafkaRDD() throws InterruptedException {
67     String topic1 = "topic1";
68     String topic2 = "topic2";
69 
70     createTopicAndSendData(topic1);
71     createTopicAndSendData(topic2);
72 
73     Map<String, String> kafkaParams = new HashMap<>();
74     kafkaParams.put("metadata.broker.list", kafkaTestUtils.brokerAddress());
75 
76     OffsetRange[] offsetRanges = {
77       OffsetRange.create(topic1, 0, 0, 1),
78       OffsetRange.create(topic2, 0, 0, 1)
79     };
80 
81     Map<TopicAndPartition, Broker> emptyLeaders = new HashMap<>();
82     Map<TopicAndPartition, Broker> leaders = new HashMap<>();
83     String[] hostAndPort = kafkaTestUtils.brokerAddress().split(":");
84     Broker broker = Broker.create(hostAndPort[0], Integer.parseInt(hostAndPort[1]));
85     leaders.put(new TopicAndPartition(topic1, 0), broker);
86     leaders.put(new TopicAndPartition(topic2, 0), broker);
87 
88     JavaRDD<String> rdd1 = KafkaUtils.createRDD(
89         sc,
90         String.class,
91         String.class,
92         StringDecoder.class,
93         StringDecoder.class,
94         kafkaParams,
95         offsetRanges
96     ).map(
97         new Function<Tuple2<String, String>, String>() {
98           @Override
99           public String call(Tuple2<String, String> kv) {
100             return kv._2();
101           }
102         }
103     );
104 
105     JavaRDD<String> rdd2 = KafkaUtils.createRDD(
106         sc,
107         String.class,
108         String.class,
109         StringDecoder.class,
110         StringDecoder.class,
111         String.class,
112         kafkaParams,
113         offsetRanges,
114         emptyLeaders,
115         new Function<MessageAndMetadata<String, String>, String>() {
116           @Override
117           public String call(MessageAndMetadata<String, String> msgAndMd) {
118             return msgAndMd.message();
119           }
120         }
121     );
122 
123     JavaRDD<String> rdd3 = KafkaUtils.createRDD(
124         sc,
125         String.class,
126         String.class,
127         StringDecoder.class,
128         StringDecoder.class,
129         String.class,
130         kafkaParams,
131         offsetRanges,
132         leaders,
133         new Function<MessageAndMetadata<String, String>, String>() {
134           @Override
135           public String call(MessageAndMetadata<String, String> msgAndMd) {
136             return msgAndMd.message();
137           }
138         }
139     );
140 
141     // just making sure the java user apis work; the scala tests handle logic corner cases
142     long count1 = rdd1.count();
143     long count2 = rdd2.count();
144     long count3 = rdd3.count();
145     Assert.assertTrue(count1 > 0);
146     Assert.assertEquals(count1, count2);
147     Assert.assertEquals(count1, count3);
148   }
149 
createTopicAndSendData(String topic)150   private  String[] createTopicAndSendData(String topic) {
151     String[] data = { topic + "-1", topic + "-2", topic + "-3"};
152     kafkaTestUtils.createTopic(topic, 1);
153     kafkaTestUtils.sendMessages(topic, data);
154     return data;
155   }
156 }
157