1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.mapred; 19 20 import java.io.IOException; 21 import java.util.ArrayList; 22 import java.util.HashMap; 23 import java.util.List; 24 import java.util.Map; 25 26 import org.apache.hadoop.mapreduce.ClusterMetrics; 27 import org.apache.hadoop.mapreduce.Job; 28 import org.apache.hadoop.mapreduce.TaskType; 29 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker; 30 31 import junit.extensions.TestSetup; 32 import junit.framework.Test; 33 import junit.framework.TestCase; 34 import junit.framework.TestSuite; 35 36 /** 37 * Class to test that ClusterMetrics are being created with the right 38 * counts of occupied and reserved slots. 39 * 40 * The tests exercise code paths where the counts of slots are updated. 41 */ 42 public class TestClusterStatus extends TestCase { 43 44 private static String[] trackers = new String[] { "tracker_tracker1:1000", 45 "tracker_tracker2:1000", "tracker_tracker3:1000" }; 46 private static JobTracker jobTracker; 47 private static int mapSlotsPerTracker = 4; 48 private static int reduceSlotsPerTracker = 2; 49 private static MiniMRCluster mr; 50 private static JobClient client; 51 // heartbeat responseId. increment this after sending a heartbeat 52 private static short responseId = 1; 53 private static FakeJobInProgress fakeJob; 54 private static FakeTaskScheduler scheduler; 55 suite()56 public static Test suite() { 57 TestSetup setup = new TestSetup(new TestSuite(TestClusterStatus.class)) { 58 protected void setUp() throws Exception { 59 JobConf conf = new JobConf(); 60 conf.setClass("mapred.jobtracker.taskScheduler", 61 TestClusterStatus.FakeTaskScheduler.class, 62 TaskScheduler.class); 63 mr = new MiniMRCluster(0, 0, 0, "file:///", 1, null, null, null, conf); 64 jobTracker = mr.getJobTrackerRunner().getJobTracker(); 65 for (String tracker : trackers) { 66 establishFirstContact(jobTracker, tracker); 67 } 68 client = new JobClient(mr.createJobConf()); 69 } 70 71 protected void tearDown() throws Exception { 72 client.close(); 73 mr.shutdown(); 74 } 75 }; 76 return setup; 77 } 78 79 /** 80 * Fake scheduler to test reservations. 81 * 82 * The reservations are updated incrementally in each 83 * heartbeat to pass through the re-reservation logic, 84 * until the scheduler is asked to unreserve slots. 85 */ 86 static class FakeTaskScheduler extends JobQueueTaskScheduler { 87 88 private Map<TaskTracker, Integer> reservedCounts 89 = new HashMap<TaskTracker, Integer>(); 90 91 // this variable can be set to trigger unreservations. 92 private boolean unreserveSlots; 93 FakeTaskScheduler()94 public FakeTaskScheduler() { 95 super(); 96 scheduler = this; 97 } 98 setUnreserveSlots(boolean shouldUnreserve)99 void setUnreserveSlots(boolean shouldUnreserve) { 100 unreserveSlots = shouldUnreserve; 101 } 102 103 @Override assignTasks(TaskTracker tt)104 public List<Task> assignTasks(TaskTracker tt) { 105 if (unreserveSlots) { 106 tt.unreserveSlots(TaskType.MAP, fakeJob); 107 tt.unreserveSlots(TaskType.REDUCE, fakeJob); 108 } else { 109 int currCount = 1; 110 if (reservedCounts.containsKey(tt)) { 111 currCount = reservedCounts.get(tt) + 1; 112 } 113 reservedCounts.put(tt, currCount); 114 tt.reserveSlots(TaskType.MAP, fakeJob, currCount); 115 tt.reserveSlots(TaskType.REDUCE, fakeJob, currCount); 116 } 117 return new ArrayList<Task>(); 118 } 119 } 120 121 /** 122 * Fake class for JobInProgress to allow testing reservation 123 * counts. 124 * 125 * This class can only be used to test functionality related to 126 * reservations, and not other aspects of the JobInProgress code 127 * because the fields may not be initialized correctly. 128 */ 129 static class FakeJobInProgress extends JobInProgress { FakeJobInProgress(JobID jId, JobConf jobConf, JobTracker jt)130 public FakeJobInProgress(JobID jId, JobConf jobConf, 131 JobTracker jt) throws IOException { 132 super(jId, jobConf, jt); 133 } 134 } 135 sendHeartBeat(JobTracker jt, TaskTrackerStatus status, boolean initialContact, boolean acceptNewTasks, String tracker, short responseId)136 static short sendHeartBeat(JobTracker jt, TaskTrackerStatus status, 137 boolean initialContact, boolean acceptNewTasks, 138 String tracker, short responseId) 139 throws IOException { 140 if (status == null) { 141 status = new TaskTrackerStatus(tracker, 142 JobInProgress.convertTrackerNameToHostName(tracker)); 143 } 144 jt.heartbeat(status, false, initialContact, acceptNewTasks, responseId); 145 return ++responseId ; 146 } 147 establishFirstContact(JobTracker jt, String tracker)148 static void establishFirstContact(JobTracker jt, String tracker) 149 throws IOException { 150 sendHeartBeat(jt, null, true, false, tracker, (short) 0); 151 } 152 getTTStatus(String trackerName, List<TaskStatus> taskStatuses)153 private TaskTrackerStatus getTTStatus(String trackerName, 154 List<TaskStatus> taskStatuses) { 155 return new TaskTrackerStatus(trackerName, 156 JobInProgress.convertTrackerNameToHostName(trackerName), 0, 157 taskStatuses, 0, 0, mapSlotsPerTracker, reduceSlotsPerTracker); 158 } 159 testClusterMetrics()160 public void testClusterMetrics() throws IOException, InterruptedException { 161 assertEquals("tasktracker count doesn't match", trackers.length, 162 client.getClusterStatus().getTaskTrackers()); 163 164 List<TaskStatus> list = new ArrayList<TaskStatus>(); 165 166 // create a map task status, which uses 2 slots. 167 int mapSlotsPerTask = 2; 168 addMapTaskAttemptToList(list, mapSlotsPerTask, TaskStatus.State.RUNNING); 169 170 // create a reduce task status, which uses 1 slot. 171 int reduceSlotsPerTask = 1; 172 addReduceTaskAttemptToList(list, 173 reduceSlotsPerTask, TaskStatus.State.RUNNING); 174 175 // create TaskTrackerStatus and send heartbeats 176 sendHeartbeats(list); 177 178 // assert ClusterMetrics 179 ClusterMetrics metrics = jobTracker.getClusterMetrics(); 180 assertEquals("occupied map slots do not match", mapSlotsPerTask, 181 metrics.getOccupiedMapSlots()); 182 assertEquals("occupied reduce slots do not match", reduceSlotsPerTask, 183 metrics.getOccupiedReduceSlots()); 184 assertEquals("map slot capacities do not match", 185 mapSlotsPerTracker * trackers.length, 186 metrics.getMapSlotCapacity()); 187 assertEquals("reduce slot capacities do not match", 188 reduceSlotsPerTracker * trackers.length, 189 metrics.getReduceSlotCapacity()); 190 assertEquals("running map tasks do not match", 1, 191 metrics.getRunningMaps()); 192 assertEquals("running reduce tasks do not match", 1, 193 metrics.getRunningReduces()); 194 195 // assert the values in ClusterStatus also 196 ClusterStatus stat = client.getClusterStatus(); 197 assertEquals("running map tasks do not match", 1, 198 stat.getMapTasks()); 199 assertEquals("running reduce tasks do not match", 1, 200 stat.getReduceTasks()); 201 assertEquals("map slot capacities do not match", 202 mapSlotsPerTracker * trackers.length, 203 stat.getMaxMapTasks()); 204 assertEquals("reduce slot capacities do not match", 205 reduceSlotsPerTracker * trackers.length, 206 stat.getMaxReduceTasks()); 207 208 // send a heartbeat finishing only a map and check 209 // counts are updated. 210 list.clear(); 211 addMapTaskAttemptToList(list, mapSlotsPerTask, TaskStatus.State.SUCCEEDED); 212 addReduceTaskAttemptToList(list, 213 reduceSlotsPerTask, TaskStatus.State.RUNNING); 214 sendHeartbeats(list); 215 metrics = jobTracker.getClusterMetrics(); 216 assertEquals(0, metrics.getOccupiedMapSlots()); 217 assertEquals(reduceSlotsPerTask, metrics.getOccupiedReduceSlots()); 218 219 // send a heartbeat finishing the reduce task also. 220 list.clear(); 221 addReduceTaskAttemptToList(list, 222 reduceSlotsPerTask, TaskStatus.State.SUCCEEDED); 223 sendHeartbeats(list); 224 metrics = jobTracker.getClusterMetrics(); 225 assertEquals(0, metrics.getOccupiedReduceSlots()); 226 } 227 sendHeartbeats(List<TaskStatus> list)228 private void sendHeartbeats(List<TaskStatus> list) throws IOException { 229 TaskTrackerStatus[] status = new TaskTrackerStatus[trackers.length]; 230 status[0] = getTTStatus(trackers[0], list); 231 status[1] = getTTStatus(trackers[1], new ArrayList<TaskStatus>()); 232 status[2] = getTTStatus(trackers[2], new ArrayList<TaskStatus>()); 233 for (int i = 0; i< trackers.length; i++) { 234 sendHeartBeat(jobTracker, status[i], false, false, 235 trackers[i], responseId); 236 } 237 responseId++; 238 } 239 addReduceTaskAttemptToList(List<TaskStatus> list, int reduceSlotsPerTask, TaskStatus.State state)240 private void addReduceTaskAttemptToList(List<TaskStatus> list, 241 int reduceSlotsPerTask, TaskStatus.State state) { 242 TaskStatus ts = TaskStatus.createTaskStatus(false, 243 new TaskAttemptID("jt", 1, false, 0, 0), 0.0f, 244 reduceSlotsPerTask, 245 state, "", "", trackers[0], 246 TaskStatus.Phase.REDUCE, null); 247 list.add(ts); 248 } 249 addMapTaskAttemptToList(List<TaskStatus> list, int mapSlotsPerTask, TaskStatus.State state)250 private void addMapTaskAttemptToList(List<TaskStatus> list, 251 int mapSlotsPerTask, TaskStatus.State state) { 252 TaskStatus ts = TaskStatus.createTaskStatus(true, 253 new TaskAttemptID("jt", 1, true, 0, 0), 0.0f, mapSlotsPerTask, 254 state, "", "", trackers[0], 255 TaskStatus.Phase.MAP, null); 256 list.add(ts); 257 } 258 testReservedSlots()259 public void testReservedSlots() throws IOException { 260 JobConf conf = mr.createJobConf(); 261 262 conf.setNumReduceTasks(1); 263 conf.setSpeculativeExecution(false); 264 265 //Set task tracker objects for reservation. 266 TaskTracker tt1 = jobTracker.getTaskTracker(trackers[0]); 267 TaskTracker tt2 = jobTracker.getTaskTracker(trackers[1]); 268 TaskTrackerStatus status1 = new TaskTrackerStatus( 269 trackers[0],JobInProgress.convertTrackerNameToHostName( 270 trackers[0]),0,new ArrayList<TaskStatus>(), 0, 0, 2, 2); 271 TaskTrackerStatus status2 = new TaskTrackerStatus( 272 trackers[1],JobInProgress.convertTrackerNameToHostName( 273 trackers[1]),0,new ArrayList<TaskStatus>(), 0, 0, 2, 2); 274 tt1.setStatus(status1); 275 tt2.setStatus(status2); 276 277 fakeJob = new FakeJobInProgress(new JobID("jt", 1), new JobConf(conf), 278 jobTracker); 279 280 sendHeartBeat(jobTracker, status1, false, true, trackers[0], responseId); 281 sendHeartBeat(jobTracker, status2, false, true, trackers[1], responseId); 282 responseId++; 283 ClusterMetrics metrics = jobTracker.getClusterMetrics(); 284 assertEquals("reserved map slots do not match", 285 2, metrics.getReservedMapSlots()); 286 assertEquals("reserved reduce slots do not match", 287 2, metrics.getReservedReduceSlots()); 288 289 // redo to test re-reservations. 290 sendHeartBeat(jobTracker, status1, false, true, trackers[0], responseId); 291 sendHeartBeat(jobTracker, status2, false, true, trackers[1], responseId); 292 responseId++; 293 metrics = jobTracker.getClusterMetrics(); 294 assertEquals("reserved map slots do not match", 295 4, metrics.getReservedMapSlots()); 296 assertEquals("reserved reduce slots do not match", 297 4, metrics.getReservedReduceSlots()); 298 299 // undo reservations now. 300 scheduler.setUnreserveSlots(true); 301 sendHeartBeat(jobTracker, status1, false, true, trackers[0], responseId); 302 sendHeartBeat(jobTracker, status2, false, true, trackers[1], responseId); 303 responseId++; 304 metrics = jobTracker.getClusterMetrics(); 305 assertEquals("map slots should have been unreserved", 306 0, metrics.getReservedMapSlots()); 307 assertEquals("reduce slots should have been unreserved", 308 0, metrics.getReservedReduceSlots()); 309 } 310 testClusterStatus()311 public void testClusterStatus() throws Exception { 312 ClusterStatus clusterStatus = client.getClusterStatus(); 313 assertEquals("JobTracker used-memory is " + clusterStatus.getUsedMemory() + 314 ", expected " + ClusterStatus.UNINITIALIZED_MEMORY_VALUE, 315 ClusterStatus.UNINITIALIZED_MEMORY_VALUE, clusterStatus.getUsedMemory()); 316 assertEquals("JobTracker max-memory is " + clusterStatus.getMaxMemory() + 317 ", expected " + ClusterStatus.UNINITIALIZED_MEMORY_VALUE, 318 ClusterStatus.UNINITIALIZED_MEMORY_VALUE, clusterStatus.getMaxMemory()); 319 320 clusterStatus = client.getClusterStatus(false); 321 assertEquals("JobTracker used-memory is " + clusterStatus.getUsedMemory() + 322 ", expected " + ClusterStatus.UNINITIALIZED_MEMORY_VALUE, 323 ClusterStatus.UNINITIALIZED_MEMORY_VALUE, clusterStatus.getUsedMemory()); 324 assertEquals("JobTracker max-memory is " + clusterStatus.getMaxMemory() + 325 ", expected " + ClusterStatus.UNINITIALIZED_MEMORY_VALUE, 326 ClusterStatus.UNINITIALIZED_MEMORY_VALUE, clusterStatus.getMaxMemory()); 327 328 clusterStatus = client.getClusterStatus(true); 329 if (ClusterStatus.UNINITIALIZED_MEMORY_VALUE == clusterStatus.getUsedMemory()) { 330 assertEquals("JobTracker used-memory is " + clusterStatus.getUsedMemory(), 331 true, false); 332 } 333 if (ClusterStatus.UNINITIALIZED_MEMORY_VALUE == clusterStatus.getMaxMemory()) { 334 assertEquals("JobTracker max-memory is " + clusterStatus.getMaxMemory(), 335 true, false); 336 } 337 } 338 } 339