1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.mapreduce.task.reduce; 19 20 import static org.mockito.Mockito.mock; 21 import org.apache.hadoop.fs.FileSystem; 22 import org.apache.hadoop.fs.LocalDirAllocator; 23 import org.apache.hadoop.io.compress.CompressionCodec; 24 import org.apache.hadoop.mapred.JobConf; 25 import org.apache.hadoop.mapred.MapOutputFile; 26 import org.apache.hadoop.mapred.Reporter; 27 import org.apache.hadoop.mapred.ShuffleConsumerPlugin; 28 import org.apache.hadoop.mapred.Task; 29 import org.apache.hadoop.mapred.TaskAttemptID; 30 import org.apache.hadoop.mapred.TaskStatus; 31 import org.apache.hadoop.mapred.TaskUmbilicalProtocol; 32 import org.apache.hadoop.mapred.Counters.Counter; 33 import org.apache.hadoop.mapred.Task.CombineOutputCollector; 34 import org.apache.hadoop.mapreduce.JobID; 35 import org.apache.hadoop.mapreduce.TaskID; 36 import org.apache.hadoop.mapreduce.TaskType; 37 import org.apache.hadoop.util.Progress; 38 import org.junit.Assert; 39 import org.junit.Test; 40 41 public class TestShuffleScheduler { 42 43 @SuppressWarnings("rawtypes") 44 @Test testTipFailed()45 public void testTipFailed() throws Exception { 46 JobConf job = new JobConf(); 47 job.setNumMapTasks(2); 48 49 TaskStatus status = new TaskStatus() { 50 @Override 51 public boolean getIsMap() { 52 return false; 53 } 54 55 @Override 56 public void addFetchFailedMap(TaskAttemptID mapTaskId) { 57 } 58 }; 59 Progress progress = new Progress(); 60 61 TaskAttemptID reduceId = new TaskAttemptID("314159", 0, TaskType.REDUCE, 62 0, 0); 63 ShuffleSchedulerImpl scheduler = new ShuffleSchedulerImpl(job, status, 64 reduceId, null, progress, null, null, null); 65 66 JobID jobId = new JobID(); 67 TaskID taskId1 = new TaskID(jobId, TaskType.REDUCE, 1); 68 scheduler.tipFailed(taskId1); 69 70 Assert.assertEquals("Progress should be 0.5", 0.5f, progress.getProgress(), 71 0.0f); 72 Assert.assertFalse(scheduler.waitUntilDone(1)); 73 74 TaskID taskId0 = new TaskID(jobId, TaskType.REDUCE, 0); 75 scheduler.tipFailed(taskId0); 76 Assert.assertEquals("Progress should be 1.0", 1.0f, progress.getProgress(), 77 0.0f); 78 Assert.assertTrue(scheduler.waitUntilDone(1)); 79 } 80 81 @SuppressWarnings("rawtypes") 82 @Test TestAggregatedTransferRate()83 public <K, V> void TestAggregatedTransferRate() throws Exception { 84 JobConf job = new JobConf(); 85 job.setNumMapTasks(10); 86 //mock creation 87 TaskUmbilicalProtocol mockUmbilical = mock(TaskUmbilicalProtocol.class); 88 Reporter mockReporter = mock(Reporter.class); 89 FileSystem mockFileSystem = mock(FileSystem.class); 90 Class<? extends org.apache.hadoop.mapred.Reducer> combinerClass = job.getCombinerClass(); 91 @SuppressWarnings("unchecked") // needed for mock with generic 92 CombineOutputCollector<K, V> mockCombineOutputCollector = 93 (CombineOutputCollector<K, V>) mock(CombineOutputCollector.class); 94 org.apache.hadoop.mapreduce.TaskAttemptID mockTaskAttemptID = 95 mock(org.apache.hadoop.mapreduce.TaskAttemptID.class); 96 LocalDirAllocator mockLocalDirAllocator = mock(LocalDirAllocator.class); 97 CompressionCodec mockCompressionCodec = mock(CompressionCodec.class); 98 Counter mockCounter = mock(Counter.class); 99 TaskStatus mockTaskStatus = mock(TaskStatus.class); 100 Progress mockProgress = mock(Progress.class); 101 MapOutputFile mockMapOutputFile = mock(MapOutputFile.class); 102 Task mockTask = mock(Task.class); 103 @SuppressWarnings("unchecked") 104 MapOutput<K, V> output = mock(MapOutput.class); 105 106 ShuffleConsumerPlugin.Context<K, V> context = 107 new ShuffleConsumerPlugin.Context<K, V>(mockTaskAttemptID, job, mockFileSystem, 108 mockUmbilical, mockLocalDirAllocator, 109 mockReporter, mockCompressionCodec, 110 combinerClass, mockCombineOutputCollector, 111 mockCounter, mockCounter, mockCounter, 112 mockCounter, mockCounter, mockCounter, 113 mockTaskStatus, mockProgress, mockProgress, 114 mockTask, mockMapOutputFile, null); 115 TaskStatus status = new TaskStatus() { 116 @Override 117 public boolean getIsMap() { 118 return false; 119 } 120 @Override 121 public void addFetchFailedMap(TaskAttemptID mapTaskId) { 122 } 123 }; 124 Progress progress = new Progress(); 125 ShuffleSchedulerImpl<K, V> scheduler = new ShuffleSchedulerImpl<K, V>(job, status, null, 126 null, progress, context.getShuffledMapsCounter(), 127 context.getReduceShuffleBytes(), context.getFailedShuffleCounter()); 128 TaskAttemptID attemptID0 = new TaskAttemptID( 129 new org.apache.hadoop.mapred.TaskID( 130 new JobID("test",0), TaskType.MAP, 0), 0); 131 132 //adding the 1st interval, 40MB from 60s to 100s 133 long bytes = (long)40 * 1024 * 1024; 134 scheduler.copySucceeded(attemptID0, new MapHost(null, null), bytes, 60000, 100000, output); 135 Assert.assertEquals(copyMessage(1, 1, 1), progress.toString()); 136 137 TaskAttemptID attemptID1 = new TaskAttemptID( 138 new org.apache.hadoop.mapred.TaskID( 139 new JobID("test",0), TaskType.MAP, 1), 1); 140 141 //adding the 2nd interval before the 1st interval, 50MB from 0s to 50s 142 bytes = (long)50 * 1024 * 1024; 143 scheduler.copySucceeded(attemptID1, new MapHost(null, null), bytes, 0, 50000, output); 144 Assert.assertEquals(copyMessage(2, 1, 1), progress.toString()); 145 146 TaskAttemptID attemptID2 = new TaskAttemptID( 147 new org.apache.hadoop.mapred.TaskID( 148 new JobID("test",0), TaskType.MAP, 2), 2); 149 150 //adding the 3rd interval overlapping with the 1st and the 2nd interval 151 //110MB from 25s to 80s 152 bytes = (long)110 * 1024 * 1024; 153 scheduler.copySucceeded(attemptID2, new MapHost(null, null), bytes, 25000, 80000, output); 154 Assert.assertEquals(copyMessage(3, 2, 2), progress.toString()); 155 156 TaskAttemptID attemptID3 = new TaskAttemptID( 157 new org.apache.hadoop.mapred.TaskID( 158 new JobID("test",0), TaskType.MAP, 3), 3); 159 160 //adding the 4th interval just after the 2nd interval, 100MB from 100s to 300s 161 bytes = (long)100 * 1024 * 1024; 162 scheduler.copySucceeded(attemptID3, new MapHost(null, null), bytes, 100000, 300000, output); 163 Assert.assertEquals(copyMessage(4, 0.5, 1), progress.toString()); 164 165 TaskAttemptID attemptID4 = new TaskAttemptID( 166 new org.apache.hadoop.mapred.TaskID( 167 new JobID("test",0), TaskType.MAP, 4), 4); 168 169 //adding the 5th interval between after 4th, 50MB from 350s to 400s 170 bytes = (long)50 * 1024 * 1024; 171 scheduler.copySucceeded(attemptID4, new MapHost(null, null), bytes, 350000, 400000, output); 172 Assert.assertEquals(copyMessage(5, 1, 1), progress.toString()); 173 174 175 TaskAttemptID attemptID5 = new TaskAttemptID( 176 new org.apache.hadoop.mapred.TaskID( 177 new JobID("test",0), TaskType.MAP, 5), 5); 178 //adding the 6th interval between after 5th, 50MB from 450s to 500s 179 bytes = (long)50 * 1024 * 1024; 180 scheduler.copySucceeded(attemptID5, new MapHost(null, null), bytes, 450000, 500000, output); 181 Assert.assertEquals(copyMessage(6, 1, 1), progress.toString()); 182 183 TaskAttemptID attemptID6 = new TaskAttemptID( 184 new org.apache.hadoop.mapred.TaskID( 185 new JobID("test",0), TaskType.MAP, 6), 6); 186 //adding the 7th interval between after 5th and 6th interval, 20MB from 320s to 340s 187 bytes = (long)20 * 1024 * 1024; 188 scheduler.copySucceeded(attemptID6, new MapHost(null, null), bytes, 320000, 340000, output); 189 Assert.assertEquals(copyMessage(7, 1, 1), progress.toString()); 190 191 TaskAttemptID attemptID7 = new TaskAttemptID( 192 new org.apache.hadoop.mapred.TaskID( 193 new JobID("test",0), TaskType.MAP, 7), 7); 194 //adding the 8th interval overlapping with 4th, 5th, and 7th 30MB from 290s to 350s 195 bytes = (long)30 * 1024 * 1024; 196 scheduler.copySucceeded(attemptID7, new MapHost(null, null), bytes, 290000, 350000, output); 197 Assert.assertEquals(copyMessage(8, 0.5, 1), progress.toString()); 198 199 TaskAttemptID attemptID8 = new TaskAttemptID( 200 new org.apache.hadoop.mapred.TaskID( 201 new JobID("test",0), TaskType.MAP, 8), 8); 202 //adding the 9th interval overlapping with 5th and 6th, 50MB from 400s to 450s 203 bytes = (long)50 * 1024 * 1024; 204 scheduler.copySucceeded(attemptID8, new MapHost(null, null), bytes, 400000, 450000, output); 205 Assert.assertEquals(copyMessage(9, 1, 1), progress.toString()); 206 207 TaskAttemptID attemptID9 = new TaskAttemptID( 208 new org.apache.hadoop.mapred.TaskID( 209 new JobID("test",0), TaskType.MAP, 9), 9); 210 //adding the 10th interval overlapping with all intervals, 500MB from 0s to 500s 211 bytes = (long)500 * 1024 * 1024; 212 scheduler.copySucceeded(attemptID9, new MapHost(null, null), bytes, 0, 500000, output); 213 Assert.assertEquals(copyMessage(10, 1, 2), progress.toString()); 214 } 215 216 @SuppressWarnings("rawtypes") 217 @Test TestSucceedAndFailedCopyMap()218 public <K, V> void TestSucceedAndFailedCopyMap() throws Exception { 219 JobConf job = new JobConf(); 220 job.setNumMapTasks(2); 221 //mock creation 222 TaskUmbilicalProtocol mockUmbilical = mock(TaskUmbilicalProtocol.class); 223 Reporter mockReporter = mock(Reporter.class); 224 FileSystem mockFileSystem = mock(FileSystem.class); 225 Class<? extends org.apache.hadoop.mapred.Reducer> combinerClass = job.getCombinerClass(); 226 @SuppressWarnings("unchecked") // needed for mock with generic 227 CombineOutputCollector<K, V> mockCombineOutputCollector = 228 (CombineOutputCollector<K, V>) mock(CombineOutputCollector.class); 229 org.apache.hadoop.mapreduce.TaskAttemptID mockTaskAttemptID = 230 mock(org.apache.hadoop.mapreduce.TaskAttemptID.class); 231 LocalDirAllocator mockLocalDirAllocator = mock(LocalDirAllocator.class); 232 CompressionCodec mockCompressionCodec = mock(CompressionCodec.class); 233 Counter mockCounter = mock(Counter.class); 234 TaskStatus mockTaskStatus = mock(TaskStatus.class); 235 Progress mockProgress = mock(Progress.class); 236 MapOutputFile mockMapOutputFile = mock(MapOutputFile.class); 237 Task mockTask = mock(Task.class); 238 @SuppressWarnings("unchecked") 239 MapOutput<K, V> output = mock(MapOutput.class); 240 241 ShuffleConsumerPlugin.Context<K, V> context = 242 new ShuffleConsumerPlugin.Context<K, V>( 243 mockTaskAttemptID, job, mockFileSystem, 244 mockUmbilical, mockLocalDirAllocator, 245 mockReporter, mockCompressionCodec, 246 combinerClass, mockCombineOutputCollector, 247 mockCounter, mockCounter, mockCounter, 248 mockCounter, mockCounter, mockCounter, 249 mockTaskStatus, mockProgress, mockProgress, 250 mockTask, mockMapOutputFile, null); 251 TaskStatus status = new TaskStatus() { 252 @Override 253 public boolean getIsMap() { 254 return false; 255 } 256 @Override 257 public void addFetchFailedMap(TaskAttemptID mapTaskId) { 258 } 259 }; 260 Progress progress = new Progress(); 261 ShuffleSchedulerImpl<K, V> scheduler = new ShuffleSchedulerImpl<K, V>(job, 262 status, null, null, progress, context.getShuffledMapsCounter(), 263 context.getReduceShuffleBytes(), context.getFailedShuffleCounter()); 264 265 MapHost host1 = new MapHost("host1", null); 266 TaskAttemptID failedAttemptID = new TaskAttemptID( 267 new org.apache.hadoop.mapred.TaskID( 268 new JobID("test",0), TaskType.MAP, 0), 0); 269 270 TaskAttemptID succeedAttemptID = new TaskAttemptID( 271 new org.apache.hadoop.mapred.TaskID( 272 new JobID("test",0), TaskType.MAP, 1), 1); 273 274 // handle output fetch failure for failedAttemptID, part I 275 scheduler.hostFailed(host1.getHostName()); 276 277 // handle output fetch succeed for succeedAttemptID 278 long bytes = (long)500 * 1024 * 1024; 279 scheduler.copySucceeded(succeedAttemptID, host1, bytes, 0, 500000, output); 280 281 // handle output fetch failure for failedAttemptID, part II 282 // for MAPREDUCE-6361: verify no NPE exception get thrown out 283 scheduler.copyFailed(failedAttemptID, host1, true, false); 284 } 285 copyMessage(int attemptNo, double rate1, double rate2)286 private static String copyMessage(int attemptNo, double rate1, double rate2) { 287 int attemptZero = attemptNo - 1; 288 return String.format("copy task(attempt_test_0000_m_%06d_%d succeeded at %1.2f MB/s)" 289 + " Aggregated copy rate(%d of 10 at %1.2f MB/s)", attemptZero 290 , attemptZero, rate1, attemptNo, rate2); 291 } 292 } 293