1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.mapreduce.task.reduce;
19 
20 import static org.mockito.Mockito.mock;
21 import org.apache.hadoop.fs.FileSystem;
22 import org.apache.hadoop.fs.LocalDirAllocator;
23 import org.apache.hadoop.io.compress.CompressionCodec;
24 import org.apache.hadoop.mapred.JobConf;
25 import org.apache.hadoop.mapred.MapOutputFile;
26 import org.apache.hadoop.mapred.Reporter;
27 import org.apache.hadoop.mapred.ShuffleConsumerPlugin;
28 import org.apache.hadoop.mapred.Task;
29 import org.apache.hadoop.mapred.TaskAttemptID;
30 import org.apache.hadoop.mapred.TaskStatus;
31 import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
32 import org.apache.hadoop.mapred.Counters.Counter;
33 import org.apache.hadoop.mapred.Task.CombineOutputCollector;
34 import org.apache.hadoop.mapreduce.JobID;
35 import org.apache.hadoop.mapreduce.TaskID;
36 import org.apache.hadoop.mapreduce.TaskType;
37 import org.apache.hadoop.util.Progress;
38 import org.junit.Assert;
39 import org.junit.Test;
40 
41 public class TestShuffleScheduler {
42 
43   @SuppressWarnings("rawtypes")
44   @Test
testTipFailed()45   public void testTipFailed() throws Exception {
46     JobConf job = new JobConf();
47     job.setNumMapTasks(2);
48 
49     TaskStatus status = new TaskStatus() {
50       @Override
51       public boolean getIsMap() {
52         return false;
53       }
54 
55       @Override
56       public void addFetchFailedMap(TaskAttemptID mapTaskId) {
57       }
58     };
59     Progress progress = new Progress();
60 
61     TaskAttemptID reduceId = new TaskAttemptID("314159", 0, TaskType.REDUCE,
62         0, 0);
63     ShuffleSchedulerImpl scheduler = new ShuffleSchedulerImpl(job, status,
64         reduceId, null, progress, null, null, null);
65 
66     JobID jobId = new JobID();
67     TaskID taskId1 = new TaskID(jobId, TaskType.REDUCE, 1);
68     scheduler.tipFailed(taskId1);
69 
70     Assert.assertEquals("Progress should be 0.5", 0.5f, progress.getProgress(),
71         0.0f);
72     Assert.assertFalse(scheduler.waitUntilDone(1));
73 
74     TaskID taskId0 = new TaskID(jobId, TaskType.REDUCE, 0);
75     scheduler.tipFailed(taskId0);
76     Assert.assertEquals("Progress should be 1.0", 1.0f, progress.getProgress(),
77         0.0f);
78     Assert.assertTrue(scheduler.waitUntilDone(1));
79   }
80 
81   @SuppressWarnings("rawtypes")
82   @Test
TestAggregatedTransferRate()83   public <K, V> void TestAggregatedTransferRate() throws Exception {
84     JobConf job = new JobConf();
85     job.setNumMapTasks(10);
86     //mock creation
87     TaskUmbilicalProtocol mockUmbilical = mock(TaskUmbilicalProtocol.class);
88     Reporter mockReporter = mock(Reporter.class);
89     FileSystem mockFileSystem = mock(FileSystem.class);
90     Class<? extends org.apache.hadoop.mapred.Reducer>  combinerClass = job.getCombinerClass();
91     @SuppressWarnings("unchecked")  // needed for mock with generic
92     CombineOutputCollector<K, V>  mockCombineOutputCollector =
93         (CombineOutputCollector<K, V>) mock(CombineOutputCollector.class);
94     org.apache.hadoop.mapreduce.TaskAttemptID mockTaskAttemptID =
95         mock(org.apache.hadoop.mapreduce.TaskAttemptID.class);
96     LocalDirAllocator mockLocalDirAllocator = mock(LocalDirAllocator.class);
97     CompressionCodec mockCompressionCodec = mock(CompressionCodec.class);
98     Counter mockCounter = mock(Counter.class);
99     TaskStatus mockTaskStatus = mock(TaskStatus.class);
100     Progress mockProgress = mock(Progress.class);
101     MapOutputFile mockMapOutputFile = mock(MapOutputFile.class);
102     Task mockTask = mock(Task.class);
103     @SuppressWarnings("unchecked")
104     MapOutput<K, V> output = mock(MapOutput.class);
105 
106     ShuffleConsumerPlugin.Context<K, V> context =
107         new ShuffleConsumerPlugin.Context<K, V>(mockTaskAttemptID, job, mockFileSystem,
108                                                      mockUmbilical, mockLocalDirAllocator,
109                                                      mockReporter, mockCompressionCodec,
110                                                      combinerClass, mockCombineOutputCollector,
111                                                      mockCounter, mockCounter, mockCounter,
112                                                      mockCounter, mockCounter, mockCounter,
113                                                      mockTaskStatus, mockProgress, mockProgress,
114                                                      mockTask, mockMapOutputFile, null);
115     TaskStatus status = new TaskStatus() {
116       @Override
117       public boolean getIsMap() {
118         return false;
119       }
120       @Override
121       public void addFetchFailedMap(TaskAttemptID mapTaskId) {
122       }
123     };
124     Progress progress = new Progress();
125     ShuffleSchedulerImpl<K, V> scheduler = new ShuffleSchedulerImpl<K, V>(job, status, null,
126         null, progress, context.getShuffledMapsCounter(),
127         context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
128     TaskAttemptID attemptID0 = new TaskAttemptID(
129         new org.apache.hadoop.mapred.TaskID(
130         new JobID("test",0), TaskType.MAP, 0), 0);
131 
132     //adding the 1st interval, 40MB from 60s to 100s
133     long bytes = (long)40 * 1024 * 1024;
134     scheduler.copySucceeded(attemptID0, new MapHost(null, null), bytes, 60000, 100000, output);
135     Assert.assertEquals(copyMessage(1, 1, 1), progress.toString());
136 
137     TaskAttemptID attemptID1 = new TaskAttemptID(
138         new org.apache.hadoop.mapred.TaskID(
139         new JobID("test",0), TaskType.MAP, 1), 1);
140 
141     //adding the 2nd interval before the 1st interval, 50MB from 0s to 50s
142     bytes = (long)50 * 1024 * 1024;
143     scheduler.copySucceeded(attemptID1, new MapHost(null, null), bytes, 0, 50000, output);
144     Assert.assertEquals(copyMessage(2, 1, 1), progress.toString());
145 
146     TaskAttemptID attemptID2 = new TaskAttemptID(
147         new org.apache.hadoop.mapred.TaskID(
148         new JobID("test",0), TaskType.MAP, 2), 2);
149 
150     //adding the 3rd interval overlapping with the 1st and the 2nd interval
151     //110MB from 25s to 80s
152     bytes = (long)110 * 1024 * 1024;
153     scheduler.copySucceeded(attemptID2, new MapHost(null, null), bytes, 25000, 80000, output);
154     Assert.assertEquals(copyMessage(3, 2, 2), progress.toString());
155 
156     TaskAttemptID attemptID3 = new TaskAttemptID(
157         new org.apache.hadoop.mapred.TaskID(
158         new JobID("test",0), TaskType.MAP, 3), 3);
159 
160     //adding the 4th interval just after the 2nd interval, 100MB from 100s to 300s
161     bytes = (long)100 * 1024 * 1024;
162     scheduler.copySucceeded(attemptID3, new MapHost(null, null), bytes, 100000, 300000, output);
163     Assert.assertEquals(copyMessage(4, 0.5, 1), progress.toString());
164 
165     TaskAttemptID attemptID4 = new TaskAttemptID(
166         new org.apache.hadoop.mapred.TaskID(
167         new JobID("test",0), TaskType.MAP, 4), 4);
168 
169     //adding the 5th interval between after 4th, 50MB from 350s to 400s
170     bytes = (long)50 * 1024 * 1024;
171     scheduler.copySucceeded(attemptID4, new MapHost(null, null), bytes, 350000, 400000, output);
172     Assert.assertEquals(copyMessage(5, 1, 1), progress.toString());
173 
174 
175     TaskAttemptID attemptID5 = new TaskAttemptID(
176         new org.apache.hadoop.mapred.TaskID(
177         new JobID("test",0), TaskType.MAP, 5), 5);
178     //adding the 6th interval between after 5th, 50MB from 450s to 500s
179     bytes = (long)50 * 1024 * 1024;
180     scheduler.copySucceeded(attemptID5, new MapHost(null, null), bytes, 450000, 500000, output);
181     Assert.assertEquals(copyMessage(6, 1, 1), progress.toString());
182 
183     TaskAttemptID attemptID6 = new TaskAttemptID(
184         new org.apache.hadoop.mapred.TaskID(
185         new JobID("test",0), TaskType.MAP, 6), 6);
186     //adding the 7th interval between after 5th and 6th interval, 20MB from 320s to 340s
187     bytes = (long)20 * 1024 * 1024;
188     scheduler.copySucceeded(attemptID6, new MapHost(null, null), bytes, 320000, 340000, output);
189     Assert.assertEquals(copyMessage(7, 1, 1), progress.toString());
190 
191     TaskAttemptID attemptID7 = new TaskAttemptID(
192         new org.apache.hadoop.mapred.TaskID(
193         new JobID("test",0), TaskType.MAP, 7), 7);
194     //adding the 8th interval overlapping with 4th, 5th, and 7th 30MB from 290s to 350s
195     bytes = (long)30 * 1024 * 1024;
196     scheduler.copySucceeded(attemptID7, new MapHost(null, null), bytes, 290000, 350000, output);
197     Assert.assertEquals(copyMessage(8, 0.5, 1), progress.toString());
198 
199     TaskAttemptID attemptID8 = new TaskAttemptID(
200         new org.apache.hadoop.mapred.TaskID(
201         new JobID("test",0), TaskType.MAP, 8), 8);
202     //adding the 9th interval overlapping with 5th and 6th, 50MB from 400s to 450s
203     bytes = (long)50 * 1024 * 1024;
204     scheduler.copySucceeded(attemptID8, new MapHost(null, null), bytes, 400000, 450000, output);
205     Assert.assertEquals(copyMessage(9, 1, 1), progress.toString());
206 
207     TaskAttemptID attemptID9 = new TaskAttemptID(
208         new org.apache.hadoop.mapred.TaskID(
209         new JobID("test",0), TaskType.MAP, 9), 9);
210     //adding the 10th interval overlapping with all intervals, 500MB from 0s to 500s
211     bytes = (long)500 * 1024 * 1024;
212     scheduler.copySucceeded(attemptID9, new MapHost(null, null), bytes, 0, 500000, output);
213     Assert.assertEquals(copyMessage(10, 1, 2), progress.toString());
214   }
215 
216   @SuppressWarnings("rawtypes")
217   @Test
TestSucceedAndFailedCopyMap()218   public <K, V> void TestSucceedAndFailedCopyMap() throws Exception {
219     JobConf job = new JobConf();
220     job.setNumMapTasks(2);
221     //mock creation
222     TaskUmbilicalProtocol mockUmbilical = mock(TaskUmbilicalProtocol.class);
223     Reporter mockReporter = mock(Reporter.class);
224     FileSystem mockFileSystem = mock(FileSystem.class);
225     Class<? extends org.apache.hadoop.mapred.Reducer>  combinerClass = job.getCombinerClass();
226     @SuppressWarnings("unchecked")  // needed for mock with generic
227     CombineOutputCollector<K, V>  mockCombineOutputCollector =
228         (CombineOutputCollector<K, V>) mock(CombineOutputCollector.class);
229     org.apache.hadoop.mapreduce.TaskAttemptID mockTaskAttemptID =
230         mock(org.apache.hadoop.mapreduce.TaskAttemptID.class);
231     LocalDirAllocator mockLocalDirAllocator = mock(LocalDirAllocator.class);
232     CompressionCodec mockCompressionCodec = mock(CompressionCodec.class);
233     Counter mockCounter = mock(Counter.class);
234     TaskStatus mockTaskStatus = mock(TaskStatus.class);
235     Progress mockProgress = mock(Progress.class);
236     MapOutputFile mockMapOutputFile = mock(MapOutputFile.class);
237     Task mockTask = mock(Task.class);
238     @SuppressWarnings("unchecked")
239     MapOutput<K, V> output = mock(MapOutput.class);
240 
241     ShuffleConsumerPlugin.Context<K, V> context =
242         new ShuffleConsumerPlugin.Context<K, V>(
243             mockTaskAttemptID, job, mockFileSystem,
244             mockUmbilical, mockLocalDirAllocator,
245             mockReporter, mockCompressionCodec,
246             combinerClass, mockCombineOutputCollector,
247             mockCounter, mockCounter, mockCounter,
248             mockCounter, mockCounter, mockCounter,
249             mockTaskStatus, mockProgress, mockProgress,
250             mockTask, mockMapOutputFile, null);
251     TaskStatus status = new TaskStatus() {
252       @Override
253       public boolean getIsMap() {
254         return false;
255       }
256       @Override
257       public void addFetchFailedMap(TaskAttemptID mapTaskId) {
258       }
259     };
260     Progress progress = new Progress();
261     ShuffleSchedulerImpl<K, V> scheduler = new ShuffleSchedulerImpl<K, V>(job,
262         status, null, null, progress, context.getShuffledMapsCounter(),
263         context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
264 
265     MapHost host1 = new MapHost("host1", null);
266     TaskAttemptID failedAttemptID = new TaskAttemptID(
267         new org.apache.hadoop.mapred.TaskID(
268         new JobID("test",0), TaskType.MAP, 0), 0);
269 
270     TaskAttemptID succeedAttemptID = new TaskAttemptID(
271         new org.apache.hadoop.mapred.TaskID(
272         new JobID("test",0), TaskType.MAP, 1), 1);
273 
274     // handle output fetch failure for failedAttemptID, part I
275     scheduler.hostFailed(host1.getHostName());
276 
277     // handle output fetch succeed for succeedAttemptID
278     long bytes = (long)500 * 1024 * 1024;
279     scheduler.copySucceeded(succeedAttemptID, host1, bytes, 0, 500000, output);
280 
281     // handle output fetch failure for failedAttemptID, part II
282     // for MAPREDUCE-6361: verify no NPE exception get thrown out
283     scheduler.copyFailed(failedAttemptID, host1, true, false);
284   }
285 
copyMessage(int attemptNo, double rate1, double rate2)286   private static String copyMessage(int attemptNo, double rate1, double rate2) {
287     int attemptZero = attemptNo - 1;
288     return String.format("copy task(attempt_test_0000_m_%06d_%d succeeded at %1.2f MB/s)"
289             + " Aggregated copy rate(%d of 10 at %1.2f MB/s)", attemptZero
290             , attemptZero, rate1, attemptNo, rate2);
291   }
292 }
293