1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.mapred.lib;
20 
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Arrays;
24 
25 import junit.framework.Test;
26 import junit.framework.TestCase;
27 import junit.framework.TestSuite;
28 import junit.extensions.TestSetup;
29 
30 import org.apache.hadoop.fs.FileSystem;
31 import org.apache.hadoop.fs.Path;
32 import org.apache.hadoop.io.NullWritable;
33 import org.apache.hadoop.io.RawComparator;
34 import org.apache.hadoop.io.SequenceFile;
35 import org.apache.hadoop.io.Text;
36 import org.apache.hadoop.io.WritableComparable;
37 import org.apache.hadoop.io.WritableComparator;
38 import org.apache.hadoop.io.WritableUtils;
39 import org.apache.hadoop.mapred.JobConf;
40 
41 public class TestTotalOrderPartitioner extends TestCase {
42 
43   private static final Text[] splitStrings = new Text[] {
44     // -inf            // 0
45     new Text("aabbb"), // 1
46     new Text("babbb"), // 2
47     new Text("daddd"), // 3
48     new Text("dddee"), // 4
49     new Text("ddhee"), // 5
50     new Text("dingo"), // 6
51     new Text("hijjj"), // 7
52     new Text("n"),     // 8
53     new Text("yak"),   // 9
54   };
55 
56   static class Check<T> {
57     T data;
58     int part;
Check(T data, int part)59     Check(T data, int part) {
60       this.data = data;
61       this.part = part;
62     }
63   }
64 
65   private static final ArrayList<Check<Text>> testStrings =
66     new ArrayList<Check<Text>>();
67   static {
testStrings.add(new Check<Text>(new Text(R), 0))68     testStrings.add(new Check<Text>(new Text("aaaaa"), 0));
testStrings.add(new Check<Text>(new Text(R), 0))69     testStrings.add(new Check<Text>(new Text("aaabb"), 0));
testStrings.add(new Check<Text>(new Text(R), 1))70     testStrings.add(new Check<Text>(new Text("aabbb"), 1));
testStrings.add(new Check<Text>(new Text(R), 0))71     testStrings.add(new Check<Text>(new Text("aaaaa"), 0));
testStrings.add(new Check<Text>(new Text(R), 2))72     testStrings.add(new Check<Text>(new Text("babbb"), 2));
testStrings.add(new Check<Text>(new Text(R), 1))73     testStrings.add(new Check<Text>(new Text("baabb"), 1));
testStrings.add(new Check<Text>(new Text(R), 8))74     testStrings.add(new Check<Text>(new Text("yai"), 8));
testStrings.add(new Check<Text>(new Text(R), 9))75     testStrings.add(new Check<Text>(new Text("yak"), 9));
testStrings.add(new Check<Text>(new Text(R), 9))76     testStrings.add(new Check<Text>(new Text("z"), 9));
testStrings.add(new Check<Text>(new Text(R), 5))77     testStrings.add(new Check<Text>(new Text("ddngo"), 5));
testStrings.add(new Check<Text>(new Text(R), 6))78     testStrings.add(new Check<Text>(new Text("hi"), 6));
79   };
80 
writePartitionFile( String testname, JobConf conf, T[] splits)81   private static <T extends WritableComparable> Path writePartitionFile(
82       String testname, JobConf conf, T[] splits) throws IOException {
83     final FileSystem fs = FileSystem.getLocal(conf);
84     final Path testdir = new Path(System.getProperty("test.build.data", "/tmp")
85                                  ).makeQualified(fs);
86     Path p = new Path(testdir, testname + "/_partition.lst");
87     TotalOrderPartitioner.setPartitionFile(conf, p);
88     conf.setNumReduceTasks(splits.length + 1);
89     SequenceFile.Writer w = null;
90     try {
91       NullWritable nw = NullWritable.get();
92       w = SequenceFile.createWriter(fs, conf, p,
93           splits[0].getClass(), NullWritable.class,
94           SequenceFile.CompressionType.NONE);
95       for (int i = 0; i < splits.length; ++i) {
96         w.append(splits[i], NullWritable.get());
97       }
98     } finally {
99       if (null != w)
100         w.close();
101     }
102     return p;
103   }
104 
testTotalOrderMemCmp()105   public void testTotalOrderMemCmp() throws Exception {
106     TotalOrderPartitioner<Text,NullWritable> partitioner =
107       new TotalOrderPartitioner<Text,NullWritable>();
108     JobConf job = new JobConf();
109     Path p = TestTotalOrderPartitioner.<Text>writePartitionFile(
110         "totalordermemcmp", job, splitStrings);
111     job.setMapOutputKeyClass(Text.class);
112     try {
113       partitioner.configure(job);
114       NullWritable nw = NullWritable.get();
115       for (Check<Text> chk : testStrings) {
116         assertEquals(chk.data.toString(), chk.part,
117             partitioner.getPartition(chk.data, nw, splitStrings.length + 1));
118       }
119     } finally {
120       p.getFileSystem(job).delete(p);
121     }
122   }
123 
testTotalOrderBinarySearch()124   public void testTotalOrderBinarySearch() throws Exception {
125     TotalOrderPartitioner<Text,NullWritable> partitioner =
126       new TotalOrderPartitioner<Text,NullWritable>();
127     JobConf job = new JobConf();
128     Path p = TestTotalOrderPartitioner.<Text>writePartitionFile(
129         "totalorderbinarysearch", job, splitStrings);
130     job.setBoolean("total.order.partitioner.natural.order", false);
131     job.setMapOutputKeyClass(Text.class);
132     try {
133       partitioner.configure(job);
134       NullWritable nw = NullWritable.get();
135       for (Check<Text> chk : testStrings) {
136         assertEquals(chk.data.toString(), chk.part,
137             partitioner.getPartition(chk.data, nw, splitStrings.length + 1));
138       }
139     } finally {
140       p.getFileSystem(job).delete(p);
141     }
142   }
143 
144   public static class ReverseStringComparator implements RawComparator<Text> {
compare(Text a, Text b)145     public int compare(Text a, Text b) {
146       return -a.compareTo(b);
147     }
compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)148     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
149       int n1 = WritableUtils.decodeVIntSize(b1[s1]);
150       int n2 = WritableUtils.decodeVIntSize(b2[s2]);
151       return -1 * WritableComparator.compareBytes(b1, s1+n1, l1-n1,
152                                                   b2, s2+n2, l2-n2);
153     }
154   }
155 
testTotalOrderCustomComparator()156   public void testTotalOrderCustomComparator() throws Exception {
157     TotalOrderPartitioner<Text,NullWritable> partitioner =
158       new TotalOrderPartitioner<Text,NullWritable>();
159     JobConf job = new JobConf();
160     Text[] revSplitStrings = Arrays.copyOf(splitStrings, splitStrings.length);
161     Arrays.sort(revSplitStrings, new ReverseStringComparator());
162     Path p = TestTotalOrderPartitioner.<Text>writePartitionFile(
163         "totalordercustomcomparator", job, revSplitStrings);
164     job.setBoolean("total.order.partitioner.natural.order", false);
165     job.setMapOutputKeyClass(Text.class);
166     job.setOutputKeyComparatorClass(ReverseStringComparator.class);
167     ArrayList<Check<Text>> revCheck = new ArrayList<Check<Text>>();
168     revCheck.add(new Check<Text>(new Text("aaaaa"), 9));
169     revCheck.add(new Check<Text>(new Text("aaabb"), 9));
170     revCheck.add(new Check<Text>(new Text("aabbb"), 9));
171     revCheck.add(new Check<Text>(new Text("aaaaa"), 9));
172     revCheck.add(new Check<Text>(new Text("babbb"), 8));
173     revCheck.add(new Check<Text>(new Text("baabb"), 8));
174     revCheck.add(new Check<Text>(new Text("yai"), 1));
175     revCheck.add(new Check<Text>(new Text("yak"), 1));
176     revCheck.add(new Check<Text>(new Text("z"), 0));
177     revCheck.add(new Check<Text>(new Text("ddngo"), 4));
178     revCheck.add(new Check<Text>(new Text("hi"), 3));
179     try {
180       partitioner.configure(job);
181       NullWritable nw = NullWritable.get();
182       for (Check<Text> chk : revCheck) {
183         assertEquals(chk.data.toString(), chk.part,
184             partitioner.getPartition(chk.data, nw, splitStrings.length + 1));
185       }
186     } finally {
187       p.getFileSystem(job).delete(p);
188     }
189   }
190 
191 }
192