1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.mapred; 19 20 import java.io.BufferedReader; 21 import java.io.IOException; 22 import java.io.InputStream; 23 import java.io.InputStreamReader; 24 import java.io.OutputStream; 25 import java.io.OutputStreamWriter; 26 import java.io.Writer; 27 import java.util.Iterator; 28 import java.util.StringTokenizer; 29 30 import org.apache.hadoop.fs.FileUtil; 31 import org.apache.hadoop.fs.Path; 32 import org.apache.hadoop.io.LongWritable; 33 import org.apache.hadoop.io.Text; 34 import org.apache.hadoop.io.serializer.JavaSerializationComparator; 35 36 public class TestJavaSerialization extends ClusterMapReduceTestCase { 37 38 static class WordCountMapper extends MapReduceBase implements 39 Mapper<LongWritable, Text, String, Long> { 40 map(LongWritable key, Text value, OutputCollector<String, Long> output, Reporter reporter)41 public void map(LongWritable key, Text value, 42 OutputCollector<String, Long> output, Reporter reporter) 43 throws IOException { 44 StringTokenizer st = new StringTokenizer(value.toString()); 45 while (st.hasMoreTokens()) { 46 output.collect(st.nextToken(), 1L); 47 } 48 } 49 50 } 51 52 static class SumReducer<K> extends MapReduceBase implements 53 Reducer<K, Long, K, Long> { 54 reduce(K key, Iterator<Long> values, OutputCollector<K, Long> output, Reporter reporter)55 public void reduce(K key, Iterator<Long> values, 56 OutputCollector<K, Long> output, Reporter reporter) 57 throws IOException { 58 59 long sum = 0; 60 while (values.hasNext()) { 61 sum += values.next(); 62 } 63 output.collect(key, sum); 64 } 65 66 } 67 testMapReduceJob()68 public void testMapReduceJob() throws Exception { 69 OutputStream os = getFileSystem().create(new Path(getInputDir(), 70 "text.txt")); 71 Writer wr = new OutputStreamWriter(os); 72 wr.write("b a\n"); 73 wr.close(); 74 75 JobConf conf = createJobConf(); 76 conf.setJobName("JavaSerialization"); 77 78 conf.set("io.serializations", 79 "org.apache.hadoop.io.serializer.JavaSerialization," + 80 "org.apache.hadoop.io.serializer.WritableSerialization"); 81 82 conf.setInputFormat(TextInputFormat.class); 83 84 conf.setOutputKeyClass(String.class); 85 conf.setOutputValueClass(Long.class); 86 conf.setOutputKeyComparatorClass(JavaSerializationComparator.class); 87 88 conf.setMapperClass(WordCountMapper.class); 89 conf.setReducerClass(SumReducer.class); 90 91 FileInputFormat.setInputPaths(conf, getInputDir()); 92 93 FileOutputFormat.setOutputPath(conf, getOutputDir()); 94 95 JobClient.runJob(conf); 96 97 Path[] outputFiles = FileUtil.stat2Paths( 98 getFileSystem().listStatus(getOutputDir(), 99 new Utils.OutputFileUtils.OutputFilesFilter())); 100 assertEquals(1, outputFiles.length); 101 InputStream is = getFileSystem().open(outputFiles[0]); 102 BufferedReader reader = new BufferedReader(new InputStreamReader(is)); 103 assertEquals("a\t1", reader.readLine()); 104 assertEquals("b\t1", reader.readLine()); 105 assertNull(reader.readLine()); 106 reader.close(); 107 } 108 109 /** 110 * HADOOP-4466: 111 * This test verifies the JavSerialization impl can write to SequenceFiles. by virtue other 112 * SequenceFileOutputFormat is not coupled to Writable types, if so, the job will fail. 113 * 114 */ testWriteToSequencefile()115 public void testWriteToSequencefile() throws Exception { 116 OutputStream os = getFileSystem().create(new Path(getInputDir(), 117 "text.txt")); 118 Writer wr = new OutputStreamWriter(os); 119 wr.write("b a\n"); 120 wr.close(); 121 122 JobConf conf = createJobConf(); 123 conf.setJobName("JavaSerialization"); 124 125 conf.set("io.serializations", 126 "org.apache.hadoop.io.serializer.JavaSerialization," + 127 "org.apache.hadoop.io.serializer.WritableSerialization"); 128 129 conf.setInputFormat(TextInputFormat.class); 130 conf.setOutputFormat(SequenceFileOutputFormat.class); // test we can write to sequence files 131 132 conf.setOutputKeyClass(String.class); 133 conf.setOutputValueClass(Long.class); 134 conf.setOutputKeyComparatorClass(JavaSerializationComparator.class); 135 136 conf.setMapperClass(WordCountMapper.class); 137 conf.setReducerClass(SumReducer.class); 138 139 FileInputFormat.setInputPaths(conf, getInputDir()); 140 141 FileOutputFormat.setOutputPath(conf, getOutputDir()); 142 143 JobClient.runJob(conf); 144 145 Path[] outputFiles = FileUtil.stat2Paths( 146 getFileSystem().listStatus(getOutputDir(), 147 new Utils.OutputFileUtils.OutputFilesFilter())); 148 assertEquals(1, outputFiles.length); 149 } 150 151 } 152