1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.mapred;
19 
20 import java.io.BufferedReader;
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.InputStreamReader;
24 import java.io.OutputStream;
25 import java.io.OutputStreamWriter;
26 import java.io.Writer;
27 import java.util.Iterator;
28 import java.util.StringTokenizer;
29 
30 import org.apache.hadoop.fs.FileUtil;
31 import org.apache.hadoop.fs.Path;
32 import org.apache.hadoop.io.LongWritable;
33 import org.apache.hadoop.io.Text;
34 import org.apache.hadoop.io.serializer.JavaSerializationComparator;
35 
36 public class TestJavaSerialization extends ClusterMapReduceTestCase {
37 
38   static class WordCountMapper extends MapReduceBase implements
39       Mapper<LongWritable, Text, String, Long> {
40 
map(LongWritable key, Text value, OutputCollector<String, Long> output, Reporter reporter)41     public void map(LongWritable key, Text value,
42         OutputCollector<String, Long> output, Reporter reporter)
43         throws IOException {
44       StringTokenizer st = new StringTokenizer(value.toString());
45       while (st.hasMoreTokens()) {
46         output.collect(st.nextToken(), 1L);
47       }
48     }
49 
50   }
51 
52   static class SumReducer<K> extends MapReduceBase implements
53       Reducer<K, Long, K, Long> {
54 
reduce(K key, Iterator<Long> values, OutputCollector<K, Long> output, Reporter reporter)55     public void reduce(K key, Iterator<Long> values,
56         OutputCollector<K, Long> output, Reporter reporter)
57       throws IOException {
58 
59       long sum = 0;
60       while (values.hasNext()) {
61         sum += values.next();
62       }
63       output.collect(key, sum);
64     }
65 
66   }
67 
testMapReduceJob()68   public void testMapReduceJob() throws Exception {
69     OutputStream os = getFileSystem().create(new Path(getInputDir(),
70         "text.txt"));
71     Writer wr = new OutputStreamWriter(os);
72     wr.write("b a\n");
73     wr.close();
74 
75     JobConf conf = createJobConf();
76     conf.setJobName("JavaSerialization");
77 
78     conf.set("io.serializations",
79     "org.apache.hadoop.io.serializer.JavaSerialization," +
80     "org.apache.hadoop.io.serializer.WritableSerialization");
81 
82     conf.setInputFormat(TextInputFormat.class);
83 
84     conf.setOutputKeyClass(String.class);
85     conf.setOutputValueClass(Long.class);
86     conf.setOutputKeyComparatorClass(JavaSerializationComparator.class);
87 
88     conf.setMapperClass(WordCountMapper.class);
89     conf.setReducerClass(SumReducer.class);
90 
91     FileInputFormat.setInputPaths(conf, getInputDir());
92 
93     FileOutputFormat.setOutputPath(conf, getOutputDir());
94 
95     JobClient.runJob(conf);
96 
97     Path[] outputFiles = FileUtil.stat2Paths(
98                            getFileSystem().listStatus(getOutputDir(),
99                            new Utils.OutputFileUtils.OutputFilesFilter()));
100     assertEquals(1, outputFiles.length);
101     InputStream is = getFileSystem().open(outputFiles[0]);
102     BufferedReader reader = new BufferedReader(new InputStreamReader(is));
103     assertEquals("a\t1", reader.readLine());
104     assertEquals("b\t1", reader.readLine());
105     assertNull(reader.readLine());
106     reader.close();
107   }
108 
109   /**
110    * HADOOP-4466:
111    * This test verifies the JavSerialization impl can write to SequenceFiles. by virtue other
112    * SequenceFileOutputFormat is not coupled to Writable types, if so, the job will fail.
113    *
114    */
testWriteToSequencefile()115   public void testWriteToSequencefile() throws Exception {
116     OutputStream os = getFileSystem().create(new Path(getInputDir(),
117         "text.txt"));
118     Writer wr = new OutputStreamWriter(os);
119     wr.write("b a\n");
120     wr.close();
121 
122     JobConf conf = createJobConf();
123     conf.setJobName("JavaSerialization");
124 
125     conf.set("io.serializations",
126     "org.apache.hadoop.io.serializer.JavaSerialization," +
127     "org.apache.hadoop.io.serializer.WritableSerialization");
128 
129     conf.setInputFormat(TextInputFormat.class);
130     conf.setOutputFormat(SequenceFileOutputFormat.class); // test we can write to sequence files
131 
132     conf.setOutputKeyClass(String.class);
133     conf.setOutputValueClass(Long.class);
134     conf.setOutputKeyComparatorClass(JavaSerializationComparator.class);
135 
136     conf.setMapperClass(WordCountMapper.class);
137     conf.setReducerClass(SumReducer.class);
138 
139     FileInputFormat.setInputPaths(conf, getInputDir());
140 
141     FileOutputFormat.setOutputPath(conf, getOutputDir());
142 
143     JobClient.runJob(conf);
144 
145     Path[] outputFiles = FileUtil.stat2Paths(
146                            getFileSystem().listStatus(getOutputDir(),
147                            new Utils.OutputFileUtils.OutputFilesFilter()));
148     assertEquals(1, outputFiles.length);
149 }
150 
151 }
152