1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.mapred.lib; 20 21 import java.io.IOException; 22 import java.util.ArrayList; 23 import java.util.Arrays; 24 25 import junit.framework.Test; 26 import junit.framework.TestCase; 27 import junit.framework.TestSuite; 28 import junit.extensions.TestSetup; 29 30 import org.apache.hadoop.fs.FileSystem; 31 import org.apache.hadoop.fs.Path; 32 import org.apache.hadoop.io.NullWritable; 33 import org.apache.hadoop.io.RawComparator; 34 import org.apache.hadoop.io.SequenceFile; 35 import org.apache.hadoop.io.Text; 36 import org.apache.hadoop.io.WritableComparable; 37 import org.apache.hadoop.io.WritableComparator; 38 import org.apache.hadoop.io.WritableUtils; 39 import org.apache.hadoop.mapred.JobConf; 40 41 public class TestTotalOrderPartitioner extends TestCase { 42 43 private static final Text[] splitStrings = new Text[] { 44 // -inf // 0 45 new Text("aabbb"), // 1 46 new Text("babbb"), // 2 47 new Text("daddd"), // 3 48 new Text("dddee"), // 4 49 new Text("ddhee"), // 5 50 new Text("dingo"), // 6 51 new Text("hijjj"), // 7 52 new Text("n"), // 8 53 new Text("yak"), // 9 54 }; 55 56 static class Check<T> { 57 T data; 58 int part; Check(T data, int part)59 Check(T data, int part) { 60 this.data = data; 61 this.part = part; 62 } 63 } 64 65 private static final ArrayList<Check<Text>> testStrings = 66 new ArrayList<Check<Text>>(); 67 static { testStrings.add(new Check<Text>(new Text(R), 0))68 testStrings.add(new Check<Text>(new Text("aaaaa"), 0)); testStrings.add(new Check<Text>(new Text(R), 0))69 testStrings.add(new Check<Text>(new Text("aaabb"), 0)); testStrings.add(new Check<Text>(new Text(R), 1))70 testStrings.add(new Check<Text>(new Text("aabbb"), 1)); testStrings.add(new Check<Text>(new Text(R), 0))71 testStrings.add(new Check<Text>(new Text("aaaaa"), 0)); testStrings.add(new Check<Text>(new Text(R), 2))72 testStrings.add(new Check<Text>(new Text("babbb"), 2)); testStrings.add(new Check<Text>(new Text(R), 1))73 testStrings.add(new Check<Text>(new Text("baabb"), 1)); testStrings.add(new Check<Text>(new Text(R), 8))74 testStrings.add(new Check<Text>(new Text("yai"), 8)); testStrings.add(new Check<Text>(new Text(R), 9))75 testStrings.add(new Check<Text>(new Text("yak"), 9)); testStrings.add(new Check<Text>(new Text(R), 9))76 testStrings.add(new Check<Text>(new Text("z"), 9)); testStrings.add(new Check<Text>(new Text(R), 5))77 testStrings.add(new Check<Text>(new Text("ddngo"), 5)); testStrings.add(new Check<Text>(new Text(R), 6))78 testStrings.add(new Check<Text>(new Text("hi"), 6)); 79 }; 80 writePartitionFile( String testname, JobConf conf, T[] splits)81 private static <T extends WritableComparable> Path writePartitionFile( 82 String testname, JobConf conf, T[] splits) throws IOException { 83 final FileSystem fs = FileSystem.getLocal(conf); 84 final Path testdir = new Path(System.getProperty("test.build.data", "/tmp") 85 ).makeQualified(fs); 86 Path p = new Path(testdir, testname + "/_partition.lst"); 87 TotalOrderPartitioner.setPartitionFile(conf, p); 88 conf.setNumReduceTasks(splits.length + 1); 89 SequenceFile.Writer w = null; 90 try { 91 NullWritable nw = NullWritable.get(); 92 w = SequenceFile.createWriter(fs, conf, p, 93 splits[0].getClass(), NullWritable.class, 94 SequenceFile.CompressionType.NONE); 95 for (int i = 0; i < splits.length; ++i) { 96 w.append(splits[i], NullWritable.get()); 97 } 98 } finally { 99 if (null != w) 100 w.close(); 101 } 102 return p; 103 } 104 testTotalOrderMemCmp()105 public void testTotalOrderMemCmp() throws Exception { 106 TotalOrderPartitioner<Text,NullWritable> partitioner = 107 new TotalOrderPartitioner<Text,NullWritable>(); 108 JobConf job = new JobConf(); 109 Path p = TestTotalOrderPartitioner.<Text>writePartitionFile( 110 "totalordermemcmp", job, splitStrings); 111 job.setMapOutputKeyClass(Text.class); 112 try { 113 partitioner.configure(job); 114 NullWritable nw = NullWritable.get(); 115 for (Check<Text> chk : testStrings) { 116 assertEquals(chk.data.toString(), chk.part, 117 partitioner.getPartition(chk.data, nw, splitStrings.length + 1)); 118 } 119 } finally { 120 p.getFileSystem(job).delete(p); 121 } 122 } 123 testTotalOrderBinarySearch()124 public void testTotalOrderBinarySearch() throws Exception { 125 TotalOrderPartitioner<Text,NullWritable> partitioner = 126 new TotalOrderPartitioner<Text,NullWritable>(); 127 JobConf job = new JobConf(); 128 Path p = TestTotalOrderPartitioner.<Text>writePartitionFile( 129 "totalorderbinarysearch", job, splitStrings); 130 job.setBoolean("total.order.partitioner.natural.order", false); 131 job.setMapOutputKeyClass(Text.class); 132 try { 133 partitioner.configure(job); 134 NullWritable nw = NullWritable.get(); 135 for (Check<Text> chk : testStrings) { 136 assertEquals(chk.data.toString(), chk.part, 137 partitioner.getPartition(chk.data, nw, splitStrings.length + 1)); 138 } 139 } finally { 140 p.getFileSystem(job).delete(p); 141 } 142 } 143 144 public static class ReverseStringComparator implements RawComparator<Text> { compare(Text a, Text b)145 public int compare(Text a, Text b) { 146 return -a.compareTo(b); 147 } compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)148 public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 149 int n1 = WritableUtils.decodeVIntSize(b1[s1]); 150 int n2 = WritableUtils.decodeVIntSize(b2[s2]); 151 return -1 * WritableComparator.compareBytes(b1, s1+n1, l1-n1, 152 b2, s2+n2, l2-n2); 153 } 154 } 155 testTotalOrderCustomComparator()156 public void testTotalOrderCustomComparator() throws Exception { 157 TotalOrderPartitioner<Text,NullWritable> partitioner = 158 new TotalOrderPartitioner<Text,NullWritable>(); 159 JobConf job = new JobConf(); 160 Text[] revSplitStrings = Arrays.copyOf(splitStrings, splitStrings.length); 161 Arrays.sort(revSplitStrings, new ReverseStringComparator()); 162 Path p = TestTotalOrderPartitioner.<Text>writePartitionFile( 163 "totalordercustomcomparator", job, revSplitStrings); 164 job.setBoolean("total.order.partitioner.natural.order", false); 165 job.setMapOutputKeyClass(Text.class); 166 job.setOutputKeyComparatorClass(ReverseStringComparator.class); 167 ArrayList<Check<Text>> revCheck = new ArrayList<Check<Text>>(); 168 revCheck.add(new Check<Text>(new Text("aaaaa"), 9)); 169 revCheck.add(new Check<Text>(new Text("aaabb"), 9)); 170 revCheck.add(new Check<Text>(new Text("aabbb"), 9)); 171 revCheck.add(new Check<Text>(new Text("aaaaa"), 9)); 172 revCheck.add(new Check<Text>(new Text("babbb"), 8)); 173 revCheck.add(new Check<Text>(new Text("baabb"), 8)); 174 revCheck.add(new Check<Text>(new Text("yai"), 1)); 175 revCheck.add(new Check<Text>(new Text("yak"), 1)); 176 revCheck.add(new Check<Text>(new Text("z"), 0)); 177 revCheck.add(new Check<Text>(new Text("ddngo"), 4)); 178 revCheck.add(new Check<Text>(new Text("hi"), 3)); 179 try { 180 partitioner.configure(job); 181 NullWritable nw = NullWritable.get(); 182 for (Check<Text> chk : revCheck) { 183 assertEquals(chk.data.toString(), chk.part, 184 partitioner.getPartition(chk.data, nw, splitStrings.length + 1)); 185 } 186 } finally { 187 p.getFileSystem(job).delete(p); 188 } 189 } 190 191 } 192