1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.mapred;
19 
20 import java.io.IOException;
21 import java.util.ArrayList;
22 import java.util.HashMap;
23 import java.util.List;
24 import java.util.Map;
25 
26 import org.apache.hadoop.mapreduce.ClusterMetrics;
27 import org.apache.hadoop.mapreduce.Job;
28 import org.apache.hadoop.mapreduce.TaskType;
29 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
30 
31 import junit.extensions.TestSetup;
32 import junit.framework.Test;
33 import junit.framework.TestCase;
34 import junit.framework.TestSuite;
35 
36 /**
37  * Class to test that ClusterMetrics are being created with the right
38  * counts of occupied and reserved slots.
39  *
40  * The tests exercise code paths where the counts of slots are updated.
41  */
42 public class TestClusterStatus extends TestCase {
43 
44   private static String[] trackers = new String[] { "tracker_tracker1:1000",
45       "tracker_tracker2:1000", "tracker_tracker3:1000" };
46   private static JobTracker jobTracker;
47   private static int mapSlotsPerTracker = 4;
48   private static int reduceSlotsPerTracker = 2;
49   private static MiniMRCluster mr;
50   private static JobClient client;
51   // heartbeat responseId. increment this after sending a heartbeat
52   private static short responseId = 1;
53   private static FakeJobInProgress fakeJob;
54   private static FakeTaskScheduler scheduler;
55 
suite()56   public static Test suite() {
57     TestSetup setup = new TestSetup(new TestSuite(TestClusterStatus.class)) {
58       protected void setUp() throws Exception {
59         JobConf conf = new JobConf();
60         conf.setClass("mapred.jobtracker.taskScheduler",
61             TestClusterStatus.FakeTaskScheduler.class,
62                   TaskScheduler.class);
63         mr = new MiniMRCluster(0, 0, 0, "file:///", 1, null, null, null, conf);
64         jobTracker = mr.getJobTrackerRunner().getJobTracker();
65         for (String tracker : trackers) {
66           establishFirstContact(jobTracker, tracker);
67         }
68         client = new JobClient(mr.createJobConf());
69       }
70 
71       protected void tearDown() throws Exception {
72         client.close();
73         mr.shutdown();
74       }
75     };
76     return setup;
77   }
78 
79   /**
80    * Fake scheduler to test reservations.
81    *
82    * The reservations are updated incrementally in each
83    * heartbeat to pass through the re-reservation logic,
84    * until the scheduler is asked to unreserve slots.
85    */
86   static class FakeTaskScheduler extends JobQueueTaskScheduler {
87 
88     private Map<TaskTracker, Integer> reservedCounts
89       = new HashMap<TaskTracker, Integer>();
90 
91     // this variable can be set to trigger unreservations.
92     private boolean unreserveSlots;
93 
FakeTaskScheduler()94     public FakeTaskScheduler() {
95       super();
96       scheduler = this;
97     }
98 
setUnreserveSlots(boolean shouldUnreserve)99     void setUnreserveSlots(boolean shouldUnreserve) {
100       unreserveSlots = shouldUnreserve;
101     }
102 
103     @Override
assignTasks(TaskTracker tt)104     public List<Task> assignTasks(TaskTracker tt) {
105       if (unreserveSlots) {
106         tt.unreserveSlots(TaskType.MAP, fakeJob);
107         tt.unreserveSlots(TaskType.REDUCE, fakeJob);
108       } else {
109         int currCount = 1;
110         if (reservedCounts.containsKey(tt)) {
111           currCount = reservedCounts.get(tt) + 1;
112         }
113         reservedCounts.put(tt, currCount);
114         tt.reserveSlots(TaskType.MAP, fakeJob, currCount);
115         tt.reserveSlots(TaskType.REDUCE, fakeJob, currCount);
116       }
117       return new ArrayList<Task>();
118     }
119   }
120 
121   /**
122    * Fake class for JobInProgress to allow testing reservation
123    * counts.
124    *
125    * This class can only be used to test functionality related to
126    * reservations, and not other aspects of the JobInProgress code
127    * because the fields may not be initialized correctly.
128    */
129   static class FakeJobInProgress extends JobInProgress {
FakeJobInProgress(JobID jId, JobConf jobConf, JobTracker jt)130     public FakeJobInProgress(JobID jId, JobConf jobConf,
131                 JobTracker jt) throws IOException {
132       super(jId, jobConf, jt);
133     }
134   }
135 
sendHeartBeat(JobTracker jt, TaskTrackerStatus status, boolean initialContact, boolean acceptNewTasks, String tracker, short responseId)136   static short sendHeartBeat(JobTracker jt, TaskTrackerStatus status,
137       boolean initialContact, boolean acceptNewTasks,
138       String tracker, short responseId)
139       throws IOException {
140     if (status == null) {
141       status = new TaskTrackerStatus(tracker,
142       JobInProgress.convertTrackerNameToHostName(tracker));
143     }
144     jt.heartbeat(status, false, initialContact, acceptNewTasks, responseId);
145     return ++responseId ;
146   }
147 
establishFirstContact(JobTracker jt, String tracker)148   static void establishFirstContact(JobTracker jt, String tracker)
149       throws IOException {
150     sendHeartBeat(jt, null, true, false, tracker, (short) 0);
151   }
152 
getTTStatus(String trackerName, List<TaskStatus> taskStatuses)153   private TaskTrackerStatus getTTStatus(String trackerName,
154       List<TaskStatus> taskStatuses) {
155     return new TaskTrackerStatus(trackerName,
156       JobInProgress.convertTrackerNameToHostName(trackerName), 0,
157       taskStatuses, 0, 0, mapSlotsPerTracker, reduceSlotsPerTracker);
158   }
159 
testClusterMetrics()160   public void testClusterMetrics() throws IOException, InterruptedException {
161     assertEquals("tasktracker count doesn't match", trackers.length,
162       client.getClusterStatus().getTaskTrackers());
163 
164     List<TaskStatus> list = new ArrayList<TaskStatus>();
165 
166     // create a map task status, which uses 2 slots.
167     int mapSlotsPerTask = 2;
168     addMapTaskAttemptToList(list, mapSlotsPerTask, TaskStatus.State.RUNNING);
169 
170     // create a reduce task status, which uses 1 slot.
171     int reduceSlotsPerTask = 1;
172     addReduceTaskAttemptToList(list,
173         reduceSlotsPerTask, TaskStatus.State.RUNNING);
174 
175     // create TaskTrackerStatus and send heartbeats
176     sendHeartbeats(list);
177 
178     // assert ClusterMetrics
179     ClusterMetrics metrics = jobTracker.getClusterMetrics();
180     assertEquals("occupied map slots do not match", mapSlotsPerTask,
181       metrics.getOccupiedMapSlots());
182     assertEquals("occupied reduce slots do not match", reduceSlotsPerTask,
183       metrics.getOccupiedReduceSlots());
184     assertEquals("map slot capacities do not match",
185       mapSlotsPerTracker * trackers.length,
186       metrics.getMapSlotCapacity());
187     assertEquals("reduce slot capacities do not match",
188       reduceSlotsPerTracker * trackers.length,
189       metrics.getReduceSlotCapacity());
190     assertEquals("running map tasks do not match", 1,
191       metrics.getRunningMaps());
192     assertEquals("running reduce tasks do not match", 1,
193       metrics.getRunningReduces());
194 
195     // assert the values in ClusterStatus also
196     ClusterStatus stat = client.getClusterStatus();
197     assertEquals("running map tasks do not match", 1,
198       stat.getMapTasks());
199     assertEquals("running reduce tasks do not match", 1,
200       stat.getReduceTasks());
201     assertEquals("map slot capacities do not match",
202       mapSlotsPerTracker * trackers.length,
203       stat.getMaxMapTasks());
204     assertEquals("reduce slot capacities do not match",
205       reduceSlotsPerTracker * trackers.length,
206       stat.getMaxReduceTasks());
207 
208     // send a heartbeat finishing only a map and check
209     // counts are updated.
210     list.clear();
211     addMapTaskAttemptToList(list, mapSlotsPerTask, TaskStatus.State.SUCCEEDED);
212     addReduceTaskAttemptToList(list,
213         reduceSlotsPerTask, TaskStatus.State.RUNNING);
214     sendHeartbeats(list);
215     metrics = jobTracker.getClusterMetrics();
216     assertEquals(0, metrics.getOccupiedMapSlots());
217     assertEquals(reduceSlotsPerTask, metrics.getOccupiedReduceSlots());
218 
219     // send a heartbeat finishing the reduce task also.
220     list.clear();
221     addReduceTaskAttemptToList(list,
222         reduceSlotsPerTask, TaskStatus.State.SUCCEEDED);
223     sendHeartbeats(list);
224     metrics = jobTracker.getClusterMetrics();
225     assertEquals(0, metrics.getOccupiedReduceSlots());
226   }
227 
sendHeartbeats(List<TaskStatus> list)228   private void sendHeartbeats(List<TaskStatus> list) throws IOException {
229     TaskTrackerStatus[] status = new TaskTrackerStatus[trackers.length];
230     status[0] = getTTStatus(trackers[0], list);
231     status[1] = getTTStatus(trackers[1], new ArrayList<TaskStatus>());
232     status[2] = getTTStatus(trackers[2], new ArrayList<TaskStatus>());
233     for (int i = 0; i< trackers.length; i++) {
234       sendHeartBeat(jobTracker, status[i], false, false,
235           trackers[i], responseId);
236     }
237     responseId++;
238   }
239 
addReduceTaskAttemptToList(List<TaskStatus> list, int reduceSlotsPerTask, TaskStatus.State state)240   private void addReduceTaskAttemptToList(List<TaskStatus> list,
241       int reduceSlotsPerTask, TaskStatus.State state) {
242     TaskStatus ts = TaskStatus.createTaskStatus(false,
243       new TaskAttemptID("jt", 1, false, 0, 0), 0.0f,
244       reduceSlotsPerTask,
245       state, "", "", trackers[0],
246       TaskStatus.Phase.REDUCE, null);
247     list.add(ts);
248   }
249 
addMapTaskAttemptToList(List<TaskStatus> list, int mapSlotsPerTask, TaskStatus.State state)250   private void addMapTaskAttemptToList(List<TaskStatus> list,
251       int mapSlotsPerTask, TaskStatus.State state) {
252     TaskStatus ts = TaskStatus.createTaskStatus(true,
253       new TaskAttemptID("jt", 1, true, 0, 0), 0.0f, mapSlotsPerTask,
254       state, "", "", trackers[0],
255       TaskStatus.Phase.MAP, null);
256     list.add(ts);
257   }
258 
testReservedSlots()259   public void testReservedSlots() throws IOException {
260     JobConf conf = mr.createJobConf();
261 
262     conf.setNumReduceTasks(1);
263     conf.setSpeculativeExecution(false);
264 
265     //Set task tracker objects for reservation.
266     TaskTracker tt1 = jobTracker.getTaskTracker(trackers[0]);
267     TaskTracker tt2 = jobTracker.getTaskTracker(trackers[1]);
268     TaskTrackerStatus status1 = new TaskTrackerStatus(
269         trackers[0],JobInProgress.convertTrackerNameToHostName(
270             trackers[0]),0,new ArrayList<TaskStatus>(), 0, 0, 2, 2);
271     TaskTrackerStatus status2 = new TaskTrackerStatus(
272         trackers[1],JobInProgress.convertTrackerNameToHostName(
273             trackers[1]),0,new ArrayList<TaskStatus>(), 0, 0, 2, 2);
274     tt1.setStatus(status1);
275     tt2.setStatus(status2);
276 
277     fakeJob = new FakeJobInProgress(new JobID("jt", 1), new JobConf(conf),
278                     jobTracker);
279 
280     sendHeartBeat(jobTracker, status1, false, true, trackers[0], responseId);
281     sendHeartBeat(jobTracker, status2, false, true, trackers[1], responseId);
282     responseId++;
283     ClusterMetrics metrics = jobTracker.getClusterMetrics();
284     assertEquals("reserved map slots do not match",
285       2, metrics.getReservedMapSlots());
286     assertEquals("reserved reduce slots do not match",
287       2, metrics.getReservedReduceSlots());
288 
289     // redo to test re-reservations.
290     sendHeartBeat(jobTracker, status1, false, true, trackers[0], responseId);
291     sendHeartBeat(jobTracker, status2, false, true, trackers[1], responseId);
292     responseId++;
293     metrics = jobTracker.getClusterMetrics();
294     assertEquals("reserved map slots do not match",
295         4, metrics.getReservedMapSlots());
296     assertEquals("reserved reduce slots do not match",
297         4, metrics.getReservedReduceSlots());
298 
299     // undo reservations now.
300     scheduler.setUnreserveSlots(true);
301     sendHeartBeat(jobTracker, status1, false, true, trackers[0], responseId);
302     sendHeartBeat(jobTracker, status2, false, true, trackers[1], responseId);
303     responseId++;
304     metrics = jobTracker.getClusterMetrics();
305     assertEquals("map slots should have been unreserved",
306         0, metrics.getReservedMapSlots());
307     assertEquals("reduce slots should have been unreserved",
308         0, metrics.getReservedReduceSlots());
309   }
310 
testClusterStatus()311   public void testClusterStatus() throws Exception {
312     ClusterStatus clusterStatus = client.getClusterStatus();
313     assertEquals("JobTracker used-memory is " + clusterStatus.getUsedMemory() +
314                  ", expected " + ClusterStatus.UNINITIALIZED_MEMORY_VALUE,
315                  ClusterStatus.UNINITIALIZED_MEMORY_VALUE, clusterStatus.getUsedMemory());
316     assertEquals("JobTracker max-memory is " + clusterStatus.getMaxMemory() +
317         ", expected " + ClusterStatus.UNINITIALIZED_MEMORY_VALUE,
318         ClusterStatus.UNINITIALIZED_MEMORY_VALUE, clusterStatus.getMaxMemory());
319 
320     clusterStatus = client.getClusterStatus(false);
321     assertEquals("JobTracker used-memory is " + clusterStatus.getUsedMemory() +
322                  ", expected " + ClusterStatus.UNINITIALIZED_MEMORY_VALUE,
323                  ClusterStatus.UNINITIALIZED_MEMORY_VALUE, clusterStatus.getUsedMemory());
324     assertEquals("JobTracker max-memory is " + clusterStatus.getMaxMemory() +
325                  ", expected " + ClusterStatus.UNINITIALIZED_MEMORY_VALUE,
326                  ClusterStatus.UNINITIALIZED_MEMORY_VALUE, clusterStatus.getMaxMemory());
327 
328     clusterStatus = client.getClusterStatus(true);
329     if (ClusterStatus.UNINITIALIZED_MEMORY_VALUE == clusterStatus.getUsedMemory()) {
330       assertEquals("JobTracker used-memory is " + clusterStatus.getUsedMemory(),
331                    true, false);
332     }
333     if (ClusterStatus.UNINITIALIZED_MEMORY_VALUE == clusterStatus.getMaxMemory()) {
334       assertEquals("JobTracker max-memory is " + clusterStatus.getMaxMemory(),
335                     true, false);
336     }
337   }
338 }
339