1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 
19 package org.apache.hadoop.mapreduce.v2.app.rm;
20 
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Collections;
24 import java.util.Comparator;
25 import java.util.HashMap;
26 import java.util.HashSet;
27 import java.util.Iterator;
28 import java.util.LinkedHashMap;
29 import java.util.LinkedList;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Map.Entry;
33 import java.util.Set;
34 import java.util.concurrent.BlockingQueue;
35 import java.util.concurrent.LinkedBlockingQueue;
36 import java.util.concurrent.atomic.AtomicBoolean;
37 
38 import org.apache.commons.logging.Log;
39 import org.apache.commons.logging.LogFactory;
40 import org.apache.hadoop.classification.InterfaceAudience.Private;
41 import org.apache.hadoop.conf.Configuration;
42 import org.apache.hadoop.io.Text;
43 import org.apache.hadoop.mapreduce.JobCounter;
44 import org.apache.hadoop.mapreduce.MRJobConfig;
45 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
46 import org.apache.hadoop.mapreduce.jobhistory.NormalizedResourceEvent;
47 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
48 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
49 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
50 import org.apache.hadoop.mapreduce.v2.app.AppContext;
51 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
52 import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
53 import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
54 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
55 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
56 import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent;
57 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
58 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
59 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
60 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
61 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
62 import org.apache.hadoop.security.UserGroupInformation;
63 import org.apache.hadoop.util.StringInterner;
64 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
65 import org.apache.hadoop.yarn.api.records.Container;
66 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
67 import org.apache.hadoop.yarn.api.records.ContainerId;
68 import org.apache.hadoop.yarn.api.records.ContainerStatus;
69 import org.apache.hadoop.yarn.api.records.NMToken;
70 import org.apache.hadoop.yarn.api.records.NodeId;
71 import org.apache.hadoop.yarn.api.records.NodeReport;
72 import org.apache.hadoop.yarn.api.records.NodeState;
73 import org.apache.hadoop.yarn.api.records.Priority;
74 import org.apache.hadoop.yarn.api.records.Resource;
75 import org.apache.hadoop.yarn.api.records.Token;
76 import org.apache.hadoop.yarn.client.ClientRMProxy;
77 import org.apache.hadoop.yarn.client.api.NMTokenCache;
78 import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
79 import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
80 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
81 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
82 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
83 import org.apache.hadoop.yarn.util.Clock;
84 import org.apache.hadoop.yarn.util.RackResolver;
85 import org.apache.hadoop.yarn.util.resource.Resources;
86 
87 import com.google.common.annotations.VisibleForTesting;
88 
89 /**
90  * Allocates the container from the ResourceManager scheduler.
91  */
92 public class RMContainerAllocator extends RMContainerRequestor
93     implements ContainerAllocator {
94 
95   static final Log LOG = LogFactory.getLog(RMContainerAllocator.class);
96 
97   public static final
98   float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 0.05f;
99 
100   static final Priority PRIORITY_FAST_FAIL_MAP;
101   static final Priority PRIORITY_REDUCE;
102   static final Priority PRIORITY_MAP;
103 
104   @VisibleForTesting
105   public static final String RAMPDOWN_DIAGNOSTIC = "Reducer preempted "
106       + "to make room for pending map attempts";
107 
108   private Thread eventHandlingThread;
109   private final AtomicBoolean stopped;
110 
111   static {
112     PRIORITY_FAST_FAIL_MAP = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Priority.class);
113     PRIORITY_FAST_FAIL_MAP.setPriority(5);
114     PRIORITY_REDUCE = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Priority.class);
115     PRIORITY_REDUCE.setPriority(10);
116     PRIORITY_MAP = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Priority.class);
117     PRIORITY_MAP.setPriority(20);
118   }
119 
120   /*
121   Vocabulary Used:
122   pending -> requests which are NOT yet sent to RM
123   scheduled -> requests which are sent to RM but not yet assigned
124   assigned -> requests which are assigned to a container
125   completed -> request corresponding to which container has completed
126 
127   Lifecycle of map
128   scheduled->assigned->completed
129 
130   Lifecycle of reduce
131   pending->scheduled->assigned->completed
132 
133   Maps are scheduled as soon as their requests are received. Reduces are
134   added to the pending and are ramped up (added to scheduled) based
135   on completed maps and current availability in the cluster.
136   */
137 
138   //reduces which are not yet scheduled
139   private final LinkedList<ContainerRequest> pendingReduces =
140     new LinkedList<ContainerRequest>();
141 
142   //holds information about the assigned containers to task attempts
143   private final AssignedRequests assignedRequests = new AssignedRequests();
144 
145   //holds scheduled requests to be fulfilled by RM
146   private final ScheduledRequests scheduledRequests = new ScheduledRequests();
147 
148   private int containersAllocated = 0;
149   private int containersReleased = 0;
150   private int hostLocalAssigned = 0;
151   private int rackLocalAssigned = 0;
152   private int lastCompletedTasks = 0;
153 
154   private boolean recalculateReduceSchedule = false;
155   private Resource mapResourceRequest = Resources.none();
156   private Resource reduceResourceRequest = Resources.none();
157 
158   private boolean reduceStarted = false;
159   private float maxReduceRampupLimit = 0;
160   private float maxReducePreemptionLimit = 0;
161   /**
162    * after this threshold, if the container request is not allocated, it is
163    * considered delayed.
164    */
165   private long allocationDelayThresholdMs = 0;
166   private float reduceSlowStart = 0;
167   private int maxRunningMaps = 0;
168   private int maxRunningReduces = 0;
169   private long retryInterval;
170   private long retrystartTime;
171   private Clock clock;
172 
173   @VisibleForTesting
174   protected BlockingQueue<ContainerAllocatorEvent> eventQueue
175     = new LinkedBlockingQueue<ContainerAllocatorEvent>();
176 
177   private ScheduleStats scheduleStats = new ScheduleStats();
178 
RMContainerAllocator(ClientService clientService, AppContext context)179   public RMContainerAllocator(ClientService clientService, AppContext context) {
180     super(clientService, context);
181     this.stopped = new AtomicBoolean(false);
182     this.clock = context.getClock();
183   }
184 
185   @Override
serviceInit(Configuration conf)186   protected void serviceInit(Configuration conf) throws Exception {
187     super.serviceInit(conf);
188     reduceSlowStart = conf.getFloat(
189         MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART,
190         DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART);
191     maxReduceRampupLimit = conf.getFloat(
192         MRJobConfig.MR_AM_JOB_REDUCE_RAMPUP_UP_LIMIT,
193         MRJobConfig.DEFAULT_MR_AM_JOB_REDUCE_RAMP_UP_LIMIT);
194     maxReducePreemptionLimit = conf.getFloat(
195         MRJobConfig.MR_AM_JOB_REDUCE_PREEMPTION_LIMIT,
196         MRJobConfig.DEFAULT_MR_AM_JOB_REDUCE_PREEMPTION_LIMIT);
197     allocationDelayThresholdMs = conf.getInt(
198         MRJobConfig.MR_JOB_REDUCER_PREEMPT_DELAY_SEC,
199         MRJobConfig.DEFAULT_MR_JOB_REDUCER_PREEMPT_DELAY_SEC) * 1000;//sec -> ms
200     maxRunningMaps = conf.getInt(MRJobConfig.JOB_RUNNING_MAP_LIMIT,
201         MRJobConfig.DEFAULT_JOB_RUNNING_MAP_LIMIT);
202     maxRunningReduces = conf.getInt(MRJobConfig.JOB_RUNNING_REDUCE_LIMIT,
203         MRJobConfig.DEFAULT_JOB_RUNNING_REDUCE_LIMIT);
204     RackResolver.init(conf);
205     retryInterval = getConfig().getLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS,
206                                 MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS);
207     // Init startTime to current time. If all goes well, it will be reset after
208     // first attempt to contact RM.
209     retrystartTime = System.currentTimeMillis();
210   }
211 
212   @Override
serviceStart()213   protected void serviceStart() throws Exception {
214     this.eventHandlingThread = new Thread() {
215       @SuppressWarnings("unchecked")
216       @Override
217       public void run() {
218 
219         ContainerAllocatorEvent event;
220 
221         while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
222           try {
223             event = RMContainerAllocator.this.eventQueue.take();
224           } catch (InterruptedException e) {
225             if (!stopped.get()) {
226               LOG.error("Returning, interrupted : " + e);
227             }
228             return;
229           }
230 
231           try {
232             handleEvent(event);
233           } catch (Throwable t) {
234             LOG.error("Error in handling event type " + event.getType()
235                 + " to the ContainreAllocator", t);
236             // Kill the AM
237             eventHandler.handle(new JobEvent(getJob().getID(),
238               JobEventType.INTERNAL_ERROR));
239             return;
240           }
241         }
242       }
243     };
244     this.eventHandlingThread.start();
245     super.serviceStart();
246   }
247 
248   @Override
heartbeat()249   protected synchronized void heartbeat() throws Exception {
250     scheduleStats.updateAndLogIfChanged("Before Scheduling: ");
251     List<Container> allocatedContainers = getResources();
252     if (allocatedContainers != null && allocatedContainers.size() > 0) {
253       scheduledRequests.assign(allocatedContainers);
254     }
255 
256     int completedMaps = getJob().getCompletedMaps();
257     int completedTasks = completedMaps + getJob().getCompletedReduces();
258     if ((lastCompletedTasks != completedTasks) ||
259           (scheduledRequests.maps.size() > 0)) {
260       lastCompletedTasks = completedTasks;
261       recalculateReduceSchedule = true;
262     }
263 
264     if (recalculateReduceSchedule) {
265       preemptReducesIfNeeded();
266       scheduleReduces(
267           getJob().getTotalMaps(), completedMaps,
268           scheduledRequests.maps.size(), scheduledRequests.reduces.size(),
269           assignedRequests.maps.size(), assignedRequests.reduces.size(),
270           mapResourceRequest, reduceResourceRequest,
271           pendingReduces.size(),
272           maxReduceRampupLimit, reduceSlowStart);
273       recalculateReduceSchedule = false;
274     }
275 
276     scheduleStats.updateAndLogIfChanged("After Scheduling: ");
277   }
278 
279   @Override
serviceStop()280   protected void serviceStop() throws Exception {
281     if (stopped.getAndSet(true)) {
282       // return if already stopped
283       return;
284     }
285     if (eventHandlingThread != null) {
286       eventHandlingThread.interrupt();
287     }
288     super.serviceStop();
289     scheduleStats.log("Final Stats: ");
290   }
291 
292   @Private
293   @VisibleForTesting
getAssignedRequests()294   AssignedRequests getAssignedRequests() {
295     return assignedRequests;
296   }
297 
298   @Private
299   @VisibleForTesting
getScheduledRequests()300   ScheduledRequests getScheduledRequests() {
301     return scheduledRequests;
302   }
303 
getIsReduceStarted()304   public boolean getIsReduceStarted() {
305     return reduceStarted;
306   }
307 
setIsReduceStarted(boolean reduceStarted)308   public void setIsReduceStarted(boolean reduceStarted) {
309     this.reduceStarted = reduceStarted;
310   }
311 
312   @Override
handle(ContainerAllocatorEvent event)313   public void handle(ContainerAllocatorEvent event) {
314     int qSize = eventQueue.size();
315     if (qSize != 0 && qSize % 1000 == 0) {
316       LOG.info("Size of event-queue in RMContainerAllocator is " + qSize);
317     }
318     int remCapacity = eventQueue.remainingCapacity();
319     if (remCapacity < 1000) {
320       LOG.warn("Very low remaining capacity in the event-queue "
321           + "of RMContainerAllocator: " + remCapacity);
322     }
323     try {
324       eventQueue.put(event);
325     } catch (InterruptedException e) {
326       throw new YarnRuntimeException(e);
327     }
328   }
329 
330   @SuppressWarnings({ "unchecked" })
handleEvent(ContainerAllocatorEvent event)331   protected synchronized void handleEvent(ContainerAllocatorEvent event) {
332     recalculateReduceSchedule = true;
333     if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) {
334       ContainerRequestEvent reqEvent = (ContainerRequestEvent) event;
335       JobId jobId = getJob().getID();
336       Resource supportedMaxContainerCapability = getMaxContainerCapability();
337       if (reqEvent.getAttemptID().getTaskId().getTaskType().equals(TaskType.MAP)) {
338         if (mapResourceRequest.equals(Resources.none())) {
339           mapResourceRequest = reqEvent.getCapability();
340           eventHandler.handle(new JobHistoryEvent(jobId,
341             new NormalizedResourceEvent(
342               org.apache.hadoop.mapreduce.TaskType.MAP, mapResourceRequest
343                 .getMemory())));
344           LOG.info("mapResourceRequest:" + mapResourceRequest);
345           if (mapResourceRequest.getMemory() > supportedMaxContainerCapability
346             .getMemory()
347               || mapResourceRequest.getVirtualCores() > supportedMaxContainerCapability
348                 .getVirtualCores()) {
349             String diagMsg =
350                 "MAP capability required is more than the supported "
351                     + "max container capability in the cluster. Killing the Job. mapResourceRequest: "
352                     + mapResourceRequest + " maxContainerCapability:"
353                     + supportedMaxContainerCapability;
354             LOG.info(diagMsg);
355             eventHandler.handle(new JobDiagnosticsUpdateEvent(jobId, diagMsg));
356             eventHandler.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
357           }
358         }
359         // set the resources
360         reqEvent.getCapability().setMemory(mapResourceRequest.getMemory());
361         reqEvent.getCapability().setVirtualCores(
362           mapResourceRequest.getVirtualCores());
363         scheduledRequests.addMap(reqEvent);//maps are immediately scheduled
364       } else {
365         if (reduceResourceRequest.equals(Resources.none())) {
366           reduceResourceRequest = reqEvent.getCapability();
367           eventHandler.handle(new JobHistoryEvent(jobId,
368             new NormalizedResourceEvent(
369               org.apache.hadoop.mapreduce.TaskType.REDUCE,
370               reduceResourceRequest.getMemory())));
371           LOG.info("reduceResourceRequest:" + reduceResourceRequest);
372           if (reduceResourceRequest.getMemory() > supportedMaxContainerCapability
373             .getMemory()
374               || reduceResourceRequest.getVirtualCores() > supportedMaxContainerCapability
375                 .getVirtualCores()) {
376             String diagMsg =
377                 "REDUCE capability required is more than the "
378                     + "supported max container capability in the cluster. Killing the "
379                     + "Job. reduceResourceRequest: " + reduceResourceRequest
380                     + " maxContainerCapability:"
381                     + supportedMaxContainerCapability;
382             LOG.info(diagMsg);
383             eventHandler.handle(new JobDiagnosticsUpdateEvent(jobId, diagMsg));
384             eventHandler.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
385           }
386         }
387         // set the resources
388         reqEvent.getCapability().setMemory(reduceResourceRequest.getMemory());
389         reqEvent.getCapability().setVirtualCores(
390           reduceResourceRequest.getVirtualCores());
391         if (reqEvent.getEarlierAttemptFailed()) {
392           //add to the front of queue for fail fast
393           pendingReduces.addFirst(new ContainerRequest(reqEvent, PRIORITY_REDUCE));
394         } else {
395           pendingReduces.add(new ContainerRequest(reqEvent, PRIORITY_REDUCE));
396           //reduces are added to pending and are slowly ramped up
397         }
398       }
399 
400     } else if (
401         event.getType() == ContainerAllocator.EventType.CONTAINER_DEALLOCATE) {
402 
403       LOG.info("Processing the event " + event.toString());
404 
405       TaskAttemptId aId = event.getAttemptID();
406 
407       boolean removed = scheduledRequests.remove(aId);
408       if (!removed) {
409         ContainerId containerId = assignedRequests.get(aId);
410         if (containerId != null) {
411           removed = true;
412           assignedRequests.remove(aId);
413           containersReleased++;
414           pendingRelease.add(containerId);
415           release(containerId);
416         }
417       }
418       if (!removed) {
419         LOG.error("Could not deallocate container for task attemptId " +
420             aId);
421       }
422     } else if (
423         event.getType() == ContainerAllocator.EventType.CONTAINER_FAILED) {
424       ContainerFailedEvent fEv = (ContainerFailedEvent) event;
425       String host = getHost(fEv.getContMgrAddress());
426       containerFailedOnHost(host);
427     }
428   }
429 
getHost(String contMgrAddress)430   private static String getHost(String contMgrAddress) {
431     String host = contMgrAddress;
432     String[] hostport = host.split(":");
433     if (hostport.length == 2) {
434       host = hostport[0];
435     }
436     return host;
437   }
438 
439   @Private
440   @VisibleForTesting
setReduceResourceRequest(Resource res)441   synchronized void setReduceResourceRequest(Resource res) {
442     this.reduceResourceRequest = res;
443   }
444 
445   @Private
446   @VisibleForTesting
setMapResourceRequest(Resource res)447   synchronized void setMapResourceRequest(Resource res) {
448     this.mapResourceRequest = res;
449   }
450 
451   @Private
452   @VisibleForTesting
preemptReducesIfNeeded()453   void preemptReducesIfNeeded() {
454     if (reduceResourceRequest.equals(Resources.none())) {
455       return; // no reduces
456     }
457     //check if reduces have taken over the whole cluster and there are
458     //unassigned maps
459     if (scheduledRequests.maps.size() > 0) {
460       Resource resourceLimit = getResourceLimit();
461       Resource availableResourceForMap =
462           Resources.subtract(
463             resourceLimit,
464             Resources.multiply(reduceResourceRequest,
465               assignedRequests.reduces.size()
466                   - assignedRequests.preemptionWaitingReduces.size()));
467       // availableMemForMap must be sufficient to run at least 1 map
468       if (ResourceCalculatorUtils.computeAvailableContainers(availableResourceForMap,
469         mapResourceRequest, getSchedulerResourceTypes()) <= 0) {
470         // to make sure new containers are given to maps and not reduces
471         // ramp down all scheduled reduces if any
472         // (since reduces are scheduled at higher priority than maps)
473         LOG.info("Ramping down all scheduled reduces:"
474             + scheduledRequests.reduces.size());
475         for (ContainerRequest req : scheduledRequests.reduces.values()) {
476           pendingReduces.add(req);
477         }
478         scheduledRequests.reduces.clear();
479 
480         //do further checking to find the number of map requests that were
481         //hanging around for a while
482         int hangingMapRequests = getNumOfHangingRequests(scheduledRequests.maps);
483         if (hangingMapRequests > 0) {
484           // preempt for making space for at least one map
485           int preemptionReduceNumForOneMap =
486               ResourceCalculatorUtils.divideAndCeilContainers(mapResourceRequest,
487                 reduceResourceRequest, getSchedulerResourceTypes());
488           int preemptionReduceNumForPreemptionLimit =
489               ResourceCalculatorUtils.divideAndCeilContainers(
490                 Resources.multiply(resourceLimit, maxReducePreemptionLimit),
491                 reduceResourceRequest, getSchedulerResourceTypes());
492           int preemptionReduceNumForAllMaps =
493               ResourceCalculatorUtils.divideAndCeilContainers(
494                 Resources.multiply(mapResourceRequest, hangingMapRequests),
495                 reduceResourceRequest, getSchedulerResourceTypes());
496           int toPreempt =
497               Math.min(Math.max(preemptionReduceNumForOneMap,
498                 preemptionReduceNumForPreemptionLimit),
499                 preemptionReduceNumForAllMaps);
500 
501           LOG.info("Going to preempt " + toPreempt
502               + " due to lack of space for maps");
503           assignedRequests.preemptReduce(toPreempt);
504         }
505       }
506     }
507   }
508 
getNumOfHangingRequests(Map<TaskAttemptId, ContainerRequest> requestMap)509   private int getNumOfHangingRequests(Map<TaskAttemptId, ContainerRequest> requestMap) {
510     if (allocationDelayThresholdMs <= 0)
511       return requestMap.size();
512     int hangingRequests = 0;
513     long currTime = clock.getTime();
514     for (ContainerRequest request: requestMap.values()) {
515       long delay = currTime - request.requestTimeMs;
516       if (delay > allocationDelayThresholdMs)
517         hangingRequests++;
518     }
519     return hangingRequests;
520   }
521 
522   @Private
scheduleReduces( int totalMaps, int completedMaps, int scheduledMaps, int scheduledReduces, int assignedMaps, int assignedReduces, Resource mapResourceReqt, Resource reduceResourceReqt, int numPendingReduces, float maxReduceRampupLimit, float reduceSlowStart)523   public void scheduleReduces(
524       int totalMaps, int completedMaps,
525       int scheduledMaps, int scheduledReduces,
526       int assignedMaps, int assignedReduces,
527       Resource mapResourceReqt, Resource reduceResourceReqt,
528       int numPendingReduces,
529       float maxReduceRampupLimit, float reduceSlowStart) {
530 
531     if (numPendingReduces == 0) {
532       return;
533     }
534 
535     // get available resources for this job
536     Resource headRoom = getAvailableResources();
537     if (headRoom == null) {
538       headRoom = Resources.none();
539     }
540 
541     LOG.info("Recalculating schedule, headroom=" + headRoom);
542 
543     //check for slow start
544     if (!getIsReduceStarted()) {//not set yet
545       int completedMapsForReduceSlowstart = (int)Math.ceil(reduceSlowStart *
546                       totalMaps);
547       if(completedMaps < completedMapsForReduceSlowstart) {
548         LOG.info("Reduce slow start threshold not met. " +
549               "completedMapsForReduceSlowstart " +
550             completedMapsForReduceSlowstart);
551         return;
552       } else {
553         LOG.info("Reduce slow start threshold reached. Scheduling reduces.");
554         setIsReduceStarted(true);
555       }
556     }
557 
558     //if all maps are assigned, then ramp up all reduces irrespective of the
559     //headroom
560     if (scheduledMaps == 0 && numPendingReduces > 0) {
561       LOG.info("All maps assigned. " +
562           "Ramping up all remaining reduces:" + numPendingReduces);
563       scheduleAllReduces();
564       return;
565     }
566 
567     float completedMapPercent = 0f;
568     if (totalMaps != 0) {//support for 0 maps
569       completedMapPercent = (float)completedMaps/totalMaps;
570     } else {
571       completedMapPercent = 1;
572     }
573 
574     Resource netScheduledMapResource =
575         Resources.multiply(mapResourceReqt, (scheduledMaps + assignedMaps));
576 
577     Resource netScheduledReduceResource =
578         Resources.multiply(reduceResourceReqt,
579           (scheduledReduces + assignedReduces));
580 
581     Resource finalMapResourceLimit;
582     Resource finalReduceResourceLimit;
583 
584     // ramp up the reduces based on completed map percentage
585     Resource totalResourceLimit = getResourceLimit();
586 
587     Resource idealReduceResourceLimit =
588         Resources.multiply(totalResourceLimit,
589           Math.min(completedMapPercent, maxReduceRampupLimit));
590     Resource ideaMapResourceLimit =
591         Resources.subtract(totalResourceLimit, idealReduceResourceLimit);
592 
593     // check if there aren't enough maps scheduled, give the free map capacity
594     // to reduce.
595     // Even when container number equals, there may be unused resources in one
596     // dimension
597     if (ResourceCalculatorUtils.computeAvailableContainers(ideaMapResourceLimit,
598       mapResourceReqt, getSchedulerResourceTypes()) >= (scheduledMaps + assignedMaps)) {
599       // enough resource given to maps, given the remaining to reduces
600       Resource unusedMapResourceLimit =
601           Resources.subtract(ideaMapResourceLimit, netScheduledMapResource);
602       finalReduceResourceLimit =
603           Resources.add(idealReduceResourceLimit, unusedMapResourceLimit);
604       finalMapResourceLimit =
605           Resources.subtract(totalResourceLimit, finalReduceResourceLimit);
606     } else {
607       finalMapResourceLimit = ideaMapResourceLimit;
608       finalReduceResourceLimit = idealReduceResourceLimit;
609     }
610 
611     LOG.info("completedMapPercent " + completedMapPercent
612         + " totalResourceLimit:" + totalResourceLimit
613         + " finalMapResourceLimit:" + finalMapResourceLimit
614         + " finalReduceResourceLimit:" + finalReduceResourceLimit
615         + " netScheduledMapResource:" + netScheduledMapResource
616         + " netScheduledReduceResource:" + netScheduledReduceResource);
617 
618     int rampUp =
619         ResourceCalculatorUtils.computeAvailableContainers(Resources.subtract(
620                 finalReduceResourceLimit, netScheduledReduceResource),
621             reduceResourceReqt, getSchedulerResourceTypes());
622 
623     if (rampUp > 0) {
624       rampUp = Math.min(rampUp, numPendingReduces);
625       LOG.info("Ramping up " + rampUp);
626       rampUpReduces(rampUp);
627     } else if (rampUp < 0) {
628       int rampDown = -1 * rampUp;
629       rampDown = Math.min(rampDown, scheduledReduces);
630       LOG.info("Ramping down " + rampDown);
631       rampDownReduces(rampDown);
632     }
633   }
634 
635   @Private
scheduleAllReduces()636   public void scheduleAllReduces() {
637     for (ContainerRequest req : pendingReduces) {
638       scheduledRequests.addReduce(req);
639     }
640     pendingReduces.clear();
641   }
642 
643   @Private
rampUpReduces(int rampUp)644   public void rampUpReduces(int rampUp) {
645     //more reduce to be scheduled
646     for (int i = 0; i < rampUp; i++) {
647       ContainerRequest request = pendingReduces.removeFirst();
648       scheduledRequests.addReduce(request);
649     }
650   }
651 
652   @Private
rampDownReduces(int rampDown)653   public void rampDownReduces(int rampDown) {
654     //remove from the scheduled and move back to pending
655     for (int i = 0; i < rampDown; i++) {
656       ContainerRequest request = scheduledRequests.removeReduce();
657       pendingReduces.add(request);
658     }
659   }
660 
661   @SuppressWarnings("unchecked")
getResources()662   private List<Container> getResources() throws Exception {
663     applyConcurrentTaskLimits();
664 
665     // will be null the first time
666     Resource headRoom =
667         getAvailableResources() == null ? Resources.none() :
668             Resources.clone(getAvailableResources());
669     AllocateResponse response;
670     /*
671      * If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS
672      * milliseconds before aborting. During this interval, AM will still try
673      * to contact the RM.
674      */
675     try {
676       response = makeRemoteRequest();
677       // Reset retry count if no exception occurred.
678       retrystartTime = System.currentTimeMillis();
679     } catch (ApplicationAttemptNotFoundException e ) {
680       // This can happen if the RM has been restarted. If it is in that state,
681       // this application must clean itself up.
682       eventHandler.handle(new JobEvent(this.getJob().getID(),
683         JobEventType.JOB_AM_REBOOT));
684       throw new RMContainerAllocationException(
685         "Resource Manager doesn't recognize AttemptId: "
686             + this.getContext().getApplicationAttemptId(), e);
687     } catch (ApplicationMasterNotRegisteredException e) {
688       LOG.info("ApplicationMaster is out of sync with ResourceManager,"
689           + " hence resync and send outstanding requests.");
690       // RM may have restarted, re-register with RM.
691       lastResponseID = 0;
692       register();
693       addOutstandingRequestOnResync();
694       return null;
695     } catch (Exception e) {
696       // This can happen when the connection to the RM has gone down. Keep
697       // re-trying until the retryInterval has expired.
698       if (System.currentTimeMillis() - retrystartTime >= retryInterval) {
699         LOG.error("Could not contact RM after " + retryInterval + " milliseconds.");
700         eventHandler.handle(new JobEvent(this.getJob().getID(),
701                                          JobEventType.JOB_AM_REBOOT));
702         throw new RMContainerAllocationException("Could not contact RM after " +
703                                 retryInterval + " milliseconds.");
704       }
705       // Throw this up to the caller, which may decide to ignore it and
706       // continue to attempt to contact the RM.
707       throw e;
708     }
709     Resource newHeadRoom =
710         getAvailableResources() == null ? Resources.none()
711             : getAvailableResources();
712     List<Container> newContainers = response.getAllocatedContainers();
713     // Setting NMTokens
714     if (response.getNMTokens() != null) {
715       for (NMToken nmToken : response.getNMTokens()) {
716         NMTokenCache.setNMToken(nmToken.getNodeId().toString(),
717             nmToken.getToken());
718       }
719     }
720 
721     // Setting AMRMToken
722     if (response.getAMRMToken() != null) {
723       updateAMRMToken(response.getAMRMToken());
724     }
725 
726     List<ContainerStatus> finishedContainers = response.getCompletedContainersStatuses();
727     if (newContainers.size() + finishedContainers.size() > 0
728         || !headRoom.equals(newHeadRoom)) {
729       //something changed
730       recalculateReduceSchedule = true;
731       if (LOG.isDebugEnabled() && !headRoom.equals(newHeadRoom)) {
732         LOG.debug("headroom=" + newHeadRoom);
733       }
734     }
735 
736     if (LOG.isDebugEnabled()) {
737       for (Container cont : newContainers) {
738         LOG.debug("Received new Container :" + cont);
739       }
740     }
741 
742     //Called on each allocation. Will know about newly blacklisted/added hosts.
743     computeIgnoreBlacklisting();
744 
745     handleUpdatedNodes(response);
746 
747     for (ContainerStatus cont : finishedContainers) {
748       LOG.info("Received completed container " + cont.getContainerId());
749       TaskAttemptId attemptID = assignedRequests.get(cont.getContainerId());
750       if (attemptID == null) {
751         LOG.error("Container complete event for unknown container id "
752             + cont.getContainerId());
753       } else {
754         pendingRelease.remove(cont.getContainerId());
755         assignedRequests.remove(attemptID);
756 
757         // send the container completed event to Task attempt
758         eventHandler.handle(createContainerFinishedEvent(cont, attemptID));
759 
760         // Send the diagnostics
761         String diagnostics = StringInterner.weakIntern(cont.getDiagnostics());
762         eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptID,
763             diagnostics));
764       }
765     }
766     return newContainers;
767   }
768 
applyConcurrentTaskLimits()769   private void applyConcurrentTaskLimits() {
770     int numScheduledMaps = scheduledRequests.maps.size();
771     if (maxRunningMaps > 0 && numScheduledMaps > 0) {
772       int maxRequestedMaps = Math.max(0,
773           maxRunningMaps - assignedRequests.maps.size());
774       int numScheduledFailMaps = scheduledRequests.earlierFailedMaps.size();
775       int failedMapRequestLimit = Math.min(maxRequestedMaps,
776           numScheduledFailMaps);
777       int normalMapRequestLimit = Math.min(
778           maxRequestedMaps - failedMapRequestLimit,
779           numScheduledMaps - numScheduledFailMaps);
780       setRequestLimit(PRIORITY_FAST_FAIL_MAP, mapResourceRequest,
781           failedMapRequestLimit);
782       setRequestLimit(PRIORITY_MAP, mapResourceRequest, normalMapRequestLimit);
783     }
784 
785     int numScheduledReduces = scheduledRequests.reduces.size();
786     if (maxRunningReduces > 0 && numScheduledReduces > 0) {
787       int maxRequestedReduces = Math.max(0,
788           maxRunningReduces - assignedRequests.reduces.size());
789       int reduceRequestLimit = Math.min(maxRequestedReduces,
790           numScheduledReduces);
791       setRequestLimit(PRIORITY_REDUCE, reduceResourceRequest,
792           reduceRequestLimit);
793     }
794   }
795 
canAssignMaps()796   private boolean canAssignMaps() {
797     return (maxRunningMaps <= 0
798         || assignedRequests.maps.size() < maxRunningMaps);
799   }
800 
canAssignReduces()801   private boolean canAssignReduces() {
802     return (maxRunningReduces <= 0
803         || assignedRequests.reduces.size() < maxRunningReduces);
804   }
805 
updateAMRMToken(Token token)806   private void updateAMRMToken(Token token) throws IOException {
807     org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken =
808         new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>(token
809           .getIdentifier().array(), token.getPassword().array(), new Text(
810           token.getKind()), new Text(token.getService()));
811     UserGroupInformation currentUGI = UserGroupInformation.getCurrentUser();
812     currentUGI.addToken(amrmToken);
813     amrmToken.setService(ClientRMProxy.getAMRMTokenService(getConfig()));
814   }
815 
816   @VisibleForTesting
createContainerFinishedEvent(ContainerStatus cont, TaskAttemptId attemptID)817   public TaskAttemptEvent createContainerFinishedEvent(ContainerStatus cont,
818       TaskAttemptId attemptID) {
819     if (cont.getExitStatus() == ContainerExitStatus.ABORTED
820         || cont.getExitStatus() == ContainerExitStatus.PREEMPTED) {
821       // killed by framework
822       return new TaskAttemptEvent(attemptID,
823           TaskAttemptEventType.TA_KILL);
824     } else {
825       return new TaskAttemptEvent(attemptID,
826           TaskAttemptEventType.TA_CONTAINER_COMPLETED);
827     }
828   }
829 
830   @SuppressWarnings("unchecked")
handleUpdatedNodes(AllocateResponse response)831   private void handleUpdatedNodes(AllocateResponse response) {
832     // send event to the job about on updated nodes
833     List<NodeReport> updatedNodes = response.getUpdatedNodes();
834     if (!updatedNodes.isEmpty()) {
835 
836       // send event to the job to act upon completed tasks
837       eventHandler.handle(new JobUpdatedNodesEvent(getJob().getID(),
838           updatedNodes));
839 
840       // act upon running tasks
841       HashSet<NodeId> unusableNodes = new HashSet<NodeId>();
842       for (NodeReport nr : updatedNodes) {
843         NodeState nodeState = nr.getNodeState();
844         if (nodeState.isUnusable()) {
845           unusableNodes.add(nr.getNodeId());
846         }
847       }
848       for (int i = 0; i < 2; ++i) {
849         HashMap<TaskAttemptId, Container> taskSet = i == 0 ? assignedRequests.maps
850             : assignedRequests.reduces;
851         // kill running containers
852         for (Map.Entry<TaskAttemptId, Container> entry : taskSet.entrySet()) {
853           TaskAttemptId tid = entry.getKey();
854           NodeId taskAttemptNodeId = entry.getValue().getNodeId();
855           if (unusableNodes.contains(taskAttemptNodeId)) {
856             LOG.info("Killing taskAttempt:" + tid
857                 + " because it is running on unusable node:"
858                 + taskAttemptNodeId);
859             eventHandler.handle(new TaskAttemptKillEvent(tid,
860                 "TaskAttempt killed because it ran on unusable node"
861                     + taskAttemptNodeId));
862           }
863         }
864       }
865     }
866   }
867 
868   @Private
getResourceLimit()869   public Resource getResourceLimit() {
870     Resource headRoom = getAvailableResources();
871     if (headRoom == null) {
872       headRoom = Resources.none();
873     }
874     Resource assignedMapResource =
875         Resources.multiply(mapResourceRequest, assignedRequests.maps.size());
876     Resource assignedReduceResource =
877         Resources.multiply(reduceResourceRequest,
878           assignedRequests.reduces.size());
879     return Resources.add(headRoom,
880       Resources.add(assignedMapResource, assignedReduceResource));
881   }
882 
883   @Private
884   @VisibleForTesting
885   class ScheduledRequests {
886 
887     private final LinkedList<TaskAttemptId> earlierFailedMaps =
888       new LinkedList<TaskAttemptId>();
889 
890     /** Maps from a host to a list of Map tasks with data on the host */
891     private final Map<String, LinkedList<TaskAttemptId>> mapsHostMapping =
892       new HashMap<String, LinkedList<TaskAttemptId>>();
893     private final Map<String, LinkedList<TaskAttemptId>> mapsRackMapping =
894       new HashMap<String, LinkedList<TaskAttemptId>>();
895     @VisibleForTesting
896     final Map<TaskAttemptId, ContainerRequest> maps =
897       new LinkedHashMap<TaskAttemptId, ContainerRequest>();
898 
899     private final LinkedHashMap<TaskAttemptId, ContainerRequest> reduces =
900       new LinkedHashMap<TaskAttemptId, ContainerRequest>();
901 
remove(TaskAttemptId tId)902     boolean remove(TaskAttemptId tId) {
903       ContainerRequest req = null;
904       if (tId.getTaskId().getTaskType().equals(TaskType.MAP)) {
905         req = maps.remove(tId);
906       } else {
907         req = reduces.remove(tId);
908       }
909 
910       if (req == null) {
911         return false;
912       } else {
913         decContainerReq(req);
914         return true;
915       }
916     }
917 
removeReduce()918     ContainerRequest removeReduce() {
919       Iterator<Entry<TaskAttemptId, ContainerRequest>> it = reduces.entrySet().iterator();
920       if (it.hasNext()) {
921         Entry<TaskAttemptId, ContainerRequest> entry = it.next();
922         it.remove();
923         decContainerReq(entry.getValue());
924         return entry.getValue();
925       }
926       return null;
927     }
928 
addMap(ContainerRequestEvent event)929     void addMap(ContainerRequestEvent event) {
930       ContainerRequest request = null;
931 
932       if (event.getEarlierAttemptFailed()) {
933         earlierFailedMaps.add(event.getAttemptID());
934         request = new ContainerRequest(event, PRIORITY_FAST_FAIL_MAP);
935         LOG.info("Added "+event.getAttemptID()+" to list of failed maps");
936       } else {
937         for (String host : event.getHosts()) {
938           LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
939           if (list == null) {
940             list = new LinkedList<TaskAttemptId>();
941             mapsHostMapping.put(host, list);
942           }
943           list.add(event.getAttemptID());
944           if (LOG.isDebugEnabled()) {
945             LOG.debug("Added attempt req to host " + host);
946           }
947        }
948        for (String rack: event.getRacks()) {
949          LinkedList<TaskAttemptId> list = mapsRackMapping.get(rack);
950          if (list == null) {
951            list = new LinkedList<TaskAttemptId>();
952            mapsRackMapping.put(rack, list);
953          }
954          list.add(event.getAttemptID());
955          if (LOG.isDebugEnabled()) {
956             LOG.debug("Added attempt req to rack " + rack);
957          }
958        }
959        request = new ContainerRequest(event, PRIORITY_MAP);
960       }
961       maps.put(event.getAttemptID(), request);
962       addContainerReq(request);
963     }
964 
965 
addReduce(ContainerRequest req)966     void addReduce(ContainerRequest req) {
967       reduces.put(req.attemptID, req);
968       addContainerReq(req);
969     }
970 
971     // this method will change the list of allocatedContainers.
assign(List<Container> allocatedContainers)972     private void assign(List<Container> allocatedContainers) {
973       Iterator<Container> it = allocatedContainers.iterator();
974       LOG.info("Got allocated containers " + allocatedContainers.size());
975       containersAllocated += allocatedContainers.size();
976       while (it.hasNext()) {
977         Container allocated = it.next();
978         if (LOG.isDebugEnabled()) {
979           LOG.debug("Assigning container " + allocated.getId()
980               + " with priority " + allocated.getPriority() + " to NM "
981               + allocated.getNodeId());
982         }
983 
984         // check if allocated container meets memory requirements
985         // and whether we have any scheduled tasks that need
986         // a container to be assigned
987         boolean isAssignable = true;
988         Priority priority = allocated.getPriority();
989         Resource allocatedResource = allocated.getResource();
990         if (PRIORITY_FAST_FAIL_MAP.equals(priority)
991             || PRIORITY_MAP.equals(priority)) {
992           if (ResourceCalculatorUtils.computeAvailableContainers(allocatedResource,
993               mapResourceRequest, getSchedulerResourceTypes()) <= 0
994               || maps.isEmpty()) {
995             LOG.info("Cannot assign container " + allocated
996                 + " for a map as either "
997                 + " container memory less than required " + mapResourceRequest
998                 + " or no pending map tasks - maps.isEmpty="
999                 + maps.isEmpty());
1000             isAssignable = false;
1001           }
1002         }
1003         else if (PRIORITY_REDUCE.equals(priority)) {
1004           if (ResourceCalculatorUtils.computeAvailableContainers(allocatedResource,
1005               reduceResourceRequest, getSchedulerResourceTypes()) <= 0
1006               || reduces.isEmpty()) {
1007             LOG.info("Cannot assign container " + allocated
1008                 + " for a reduce as either "
1009                 + " container memory less than required " + reduceResourceRequest
1010                 + " or no pending reduce tasks - reduces.isEmpty="
1011                 + reduces.isEmpty());
1012             isAssignable = false;
1013           }
1014         } else {
1015           LOG.warn("Container allocated at unwanted priority: " + priority +
1016               ". Returning to RM...");
1017           isAssignable = false;
1018         }
1019 
1020         if(!isAssignable) {
1021           // release container if we could not assign it
1022           containerNotAssigned(allocated);
1023           it.remove();
1024           continue;
1025         }
1026 
1027         // do not assign if allocated container is on a
1028         // blacklisted host
1029         String allocatedHost = allocated.getNodeId().getHost();
1030         if (isNodeBlacklisted(allocatedHost)) {
1031           // we need to request for a new container
1032           // and release the current one
1033           LOG.info("Got allocated container on a blacklisted "
1034               + " host "+allocatedHost
1035               +". Releasing container " + allocated);
1036 
1037           // find the request matching this allocated container
1038           // and replace it with a new one
1039           ContainerRequest toBeReplacedReq =
1040               getContainerReqToReplace(allocated);
1041           if (toBeReplacedReq != null) {
1042             LOG.info("Placing a new container request for task attempt "
1043                 + toBeReplacedReq.attemptID);
1044             ContainerRequest newReq =
1045                 getFilteredContainerRequest(toBeReplacedReq);
1046             decContainerReq(toBeReplacedReq);
1047             if (toBeReplacedReq.attemptID.getTaskId().getTaskType() ==
1048                 TaskType.MAP) {
1049               maps.put(newReq.attemptID, newReq);
1050             }
1051             else {
1052               reduces.put(newReq.attemptID, newReq);
1053             }
1054             addContainerReq(newReq);
1055           }
1056           else {
1057             LOG.info("Could not map allocated container to a valid request."
1058                 + " Releasing allocated container " + allocated);
1059           }
1060 
1061           // release container if we could not assign it
1062           containerNotAssigned(allocated);
1063           it.remove();
1064           continue;
1065         }
1066       }
1067 
1068       assignContainers(allocatedContainers);
1069 
1070       // release container if we could not assign it
1071       it = allocatedContainers.iterator();
1072       while (it.hasNext()) {
1073         Container allocated = it.next();
1074         LOG.info("Releasing unassigned container " + allocated);
1075         containerNotAssigned(allocated);
1076       }
1077     }
1078 
1079     @SuppressWarnings("unchecked")
containerAssigned(Container allocated, ContainerRequest assigned)1080     private void containerAssigned(Container allocated,
1081                                     ContainerRequest assigned) {
1082       // Update resource requests
1083       decContainerReq(assigned);
1084 
1085       // send the container-assigned event to task attempt
1086       eventHandler.handle(new TaskAttemptContainerAssignedEvent(
1087           assigned.attemptID, allocated, applicationACLs));
1088 
1089       assignedRequests.add(allocated, assigned.attemptID);
1090 
1091       if (LOG.isDebugEnabled()) {
1092         LOG.info("Assigned container (" + allocated + ") "
1093             + " to task " + assigned.attemptID + " on node "
1094             + allocated.getNodeId().toString());
1095       }
1096     }
1097 
containerNotAssigned(Container allocated)1098     private void containerNotAssigned(Container allocated) {
1099       containersReleased++;
1100       pendingRelease.add(allocated.getId());
1101       release(allocated.getId());
1102     }
1103 
assignWithoutLocality(Container allocated)1104     private ContainerRequest assignWithoutLocality(Container allocated) {
1105       ContainerRequest assigned = null;
1106 
1107       Priority priority = allocated.getPriority();
1108       if (PRIORITY_FAST_FAIL_MAP.equals(priority)) {
1109         LOG.info("Assigning container " + allocated + " to fast fail map");
1110         assigned = assignToFailedMap(allocated);
1111       } else if (PRIORITY_REDUCE.equals(priority)) {
1112         if (LOG.isDebugEnabled()) {
1113           LOG.debug("Assigning container " + allocated + " to reduce");
1114         }
1115         assigned = assignToReduce(allocated);
1116       }
1117 
1118       return assigned;
1119     }
1120 
assignContainers(List<Container> allocatedContainers)1121     private void assignContainers(List<Container> allocatedContainers) {
1122       Iterator<Container> it = allocatedContainers.iterator();
1123       while (it.hasNext()) {
1124         Container allocated = it.next();
1125         ContainerRequest assigned = assignWithoutLocality(allocated);
1126         if (assigned != null) {
1127           containerAssigned(allocated, assigned);
1128           it.remove();
1129         }
1130       }
1131 
1132       assignMapsWithLocality(allocatedContainers);
1133     }
1134 
getContainerReqToReplace(Container allocated)1135     private ContainerRequest getContainerReqToReplace(Container allocated) {
1136       LOG.info("Finding containerReq for allocated container: " + allocated);
1137       Priority priority = allocated.getPriority();
1138       ContainerRequest toBeReplaced = null;
1139       if (PRIORITY_FAST_FAIL_MAP.equals(priority)) {
1140         LOG.info("Replacing FAST_FAIL_MAP container " + allocated.getId());
1141         Iterator<TaskAttemptId> iter = earlierFailedMaps.iterator();
1142         while (toBeReplaced == null && iter.hasNext()) {
1143           toBeReplaced = maps.get(iter.next());
1144         }
1145         LOG.info("Found replacement: " + toBeReplaced);
1146         return toBeReplaced;
1147       }
1148       else if (PRIORITY_MAP.equals(priority)) {
1149         LOG.info("Replacing MAP container " + allocated.getId());
1150         // allocated container was for a map
1151         String host = allocated.getNodeId().getHost();
1152         LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
1153         if (list != null && list.size() > 0) {
1154           TaskAttemptId tId = list.removeLast();
1155           if (maps.containsKey(tId)) {
1156             toBeReplaced = maps.remove(tId);
1157           }
1158         }
1159         else {
1160           TaskAttemptId tId = maps.keySet().iterator().next();
1161           toBeReplaced = maps.remove(tId);
1162         }
1163       }
1164       else if (PRIORITY_REDUCE.equals(priority)) {
1165         TaskAttemptId tId = reduces.keySet().iterator().next();
1166         toBeReplaced = reduces.remove(tId);
1167       }
1168       LOG.info("Found replacement: " + toBeReplaced);
1169       return toBeReplaced;
1170     }
1171 
1172 
1173     @SuppressWarnings("unchecked")
assignToFailedMap(Container allocated)1174     private ContainerRequest assignToFailedMap(Container allocated) {
1175       //try to assign to earlierFailedMaps if present
1176       ContainerRequest assigned = null;
1177       while (assigned == null && earlierFailedMaps.size() > 0
1178           && canAssignMaps()) {
1179         TaskAttemptId tId = earlierFailedMaps.removeFirst();
1180         if (maps.containsKey(tId)) {
1181           assigned = maps.remove(tId);
1182           JobCounterUpdateEvent jce =
1183             new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId());
1184           jce.addCounterUpdate(JobCounter.OTHER_LOCAL_MAPS, 1);
1185           eventHandler.handle(jce);
1186           LOG.info("Assigned from earlierFailedMaps");
1187           break;
1188         }
1189       }
1190       return assigned;
1191     }
1192 
assignToReduce(Container allocated)1193     private ContainerRequest assignToReduce(Container allocated) {
1194       ContainerRequest assigned = null;
1195       //try to assign to reduces if present
1196       if (assigned == null && reduces.size() > 0 && canAssignReduces()) {
1197         TaskAttemptId tId = reduces.keySet().iterator().next();
1198         assigned = reduces.remove(tId);
1199         LOG.info("Assigned to reduce");
1200       }
1201       return assigned;
1202     }
1203 
1204     @SuppressWarnings("unchecked")
assignMapsWithLocality(List<Container> allocatedContainers)1205     private void assignMapsWithLocality(List<Container> allocatedContainers) {
1206       // try to assign to all nodes first to match node local
1207       Iterator<Container> it = allocatedContainers.iterator();
1208       while(it.hasNext() && maps.size() > 0 && canAssignMaps()){
1209         Container allocated = it.next();
1210         Priority priority = allocated.getPriority();
1211         assert PRIORITY_MAP.equals(priority);
1212         // "if (maps.containsKey(tId))" below should be almost always true.
1213         // hence this while loop would almost always have O(1) complexity
1214         String host = allocated.getNodeId().getHost();
1215         LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
1216         while (list != null && list.size() > 0) {
1217           if (LOG.isDebugEnabled()) {
1218             LOG.debug("Host matched to the request list " + host);
1219           }
1220           TaskAttemptId tId = list.removeFirst();
1221           if (maps.containsKey(tId)) {
1222             ContainerRequest assigned = maps.remove(tId);
1223             containerAssigned(allocated, assigned);
1224             it.remove();
1225             JobCounterUpdateEvent jce =
1226               new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId());
1227             jce.addCounterUpdate(JobCounter.DATA_LOCAL_MAPS, 1);
1228             eventHandler.handle(jce);
1229             hostLocalAssigned++;
1230             if (LOG.isDebugEnabled()) {
1231               LOG.debug("Assigned based on host match " + host);
1232             }
1233             break;
1234           }
1235         }
1236       }
1237 
1238       // try to match all rack local
1239       it = allocatedContainers.iterator();
1240       while(it.hasNext() && maps.size() > 0 && canAssignMaps()){
1241         Container allocated = it.next();
1242         Priority priority = allocated.getPriority();
1243         assert PRIORITY_MAP.equals(priority);
1244         // "if (maps.containsKey(tId))" below should be almost always true.
1245         // hence this while loop would almost always have O(1) complexity
1246         String host = allocated.getNodeId().getHost();
1247         String rack = RackResolver.resolve(host).getNetworkLocation();
1248         LinkedList<TaskAttemptId> list = mapsRackMapping.get(rack);
1249         while (list != null && list.size() > 0) {
1250           TaskAttemptId tId = list.removeFirst();
1251           if (maps.containsKey(tId)) {
1252             ContainerRequest assigned = maps.remove(tId);
1253             containerAssigned(allocated, assigned);
1254             it.remove();
1255             JobCounterUpdateEvent jce =
1256               new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId());
1257             jce.addCounterUpdate(JobCounter.RACK_LOCAL_MAPS, 1);
1258             eventHandler.handle(jce);
1259             rackLocalAssigned++;
1260             if (LOG.isDebugEnabled()) {
1261               LOG.debug("Assigned based on rack match " + rack);
1262             }
1263             break;
1264           }
1265         }
1266       }
1267 
1268       // assign remaining
1269       it = allocatedContainers.iterator();
1270       while(it.hasNext() && maps.size() > 0 && canAssignMaps()){
1271         Container allocated = it.next();
1272         Priority priority = allocated.getPriority();
1273         assert PRIORITY_MAP.equals(priority);
1274         TaskAttemptId tId = maps.keySet().iterator().next();
1275         ContainerRequest assigned = maps.remove(tId);
1276         containerAssigned(allocated, assigned);
1277         it.remove();
1278         JobCounterUpdateEvent jce =
1279           new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId());
1280         jce.addCounterUpdate(JobCounter.OTHER_LOCAL_MAPS, 1);
1281         eventHandler.handle(jce);
1282         if (LOG.isDebugEnabled()) {
1283           LOG.debug("Assigned based on * match");
1284         }
1285       }
1286     }
1287   }
1288 
1289   @Private
1290   @VisibleForTesting
1291   class AssignedRequests {
1292     private final Map<ContainerId, TaskAttemptId> containerToAttemptMap =
1293       new HashMap<ContainerId, TaskAttemptId>();
1294     private final LinkedHashMap<TaskAttemptId, Container> maps =
1295       new LinkedHashMap<TaskAttemptId, Container>();
1296     @VisibleForTesting
1297     final LinkedHashMap<TaskAttemptId, Container> reduces =
1298       new LinkedHashMap<TaskAttemptId, Container>();
1299     @VisibleForTesting
1300     final Set<TaskAttemptId> preemptionWaitingReduces =
1301       new HashSet<TaskAttemptId>();
1302 
add(Container container, TaskAttemptId tId)1303     void add(Container container, TaskAttemptId tId) {
1304       LOG.info("Assigned container " + container.getId().toString() + " to " + tId);
1305       containerToAttemptMap.put(container.getId(), tId);
1306       if (tId.getTaskId().getTaskType().equals(TaskType.MAP)) {
1307         maps.put(tId, container);
1308       } else {
1309         reduces.put(tId, container);
1310       }
1311     }
1312 
1313     @SuppressWarnings("unchecked")
preemptReduce(int toPreempt)1314     void preemptReduce(int toPreempt) {
1315       List<TaskAttemptId> reduceList = new ArrayList<TaskAttemptId>
1316         (reduces.keySet());
1317       //sort reduces on progress
1318       Collections.sort(reduceList,
1319           new Comparator<TaskAttemptId>() {
1320         @Override
1321         public int compare(TaskAttemptId o1, TaskAttemptId o2) {
1322           return Float.compare(
1323               getJob().getTask(o1.getTaskId()).getAttempt(o1).getProgress(),
1324               getJob().getTask(o2.getTaskId()).getAttempt(o2).getProgress());
1325         }
1326       });
1327 
1328       for (int i = 0; i < toPreempt && reduceList.size() > 0; i++) {
1329         TaskAttemptId id = reduceList.remove(0);//remove the one on top
1330         LOG.info("Preempting " + id);
1331         preemptionWaitingReduces.add(id);
1332         eventHandler.handle(new TaskAttemptKillEvent(id, RAMPDOWN_DIAGNOSTIC));
1333       }
1334     }
1335 
remove(TaskAttemptId tId)1336     boolean remove(TaskAttemptId tId) {
1337       ContainerId containerId = null;
1338       if (tId.getTaskId().getTaskType().equals(TaskType.MAP)) {
1339         containerId = maps.remove(tId).getId();
1340       } else {
1341         containerId = reduces.remove(tId).getId();
1342         if (containerId != null) {
1343           boolean preempted = preemptionWaitingReduces.remove(tId);
1344           if (preempted) {
1345             LOG.info("Reduce preemption successful " + tId);
1346           }
1347         }
1348       }
1349 
1350       if (containerId != null) {
1351         containerToAttemptMap.remove(containerId);
1352         return true;
1353       }
1354       return false;
1355     }
1356 
get(ContainerId cId)1357     TaskAttemptId get(ContainerId cId) {
1358       return containerToAttemptMap.get(cId);
1359     }
1360 
get(TaskAttemptId tId)1361     ContainerId get(TaskAttemptId tId) {
1362       Container taskContainer;
1363       if (tId.getTaskId().getTaskType().equals(TaskType.MAP)) {
1364         taskContainer = maps.get(tId);
1365       } else {
1366         taskContainer = reduces.get(tId);
1367       }
1368 
1369       if (taskContainer == null) {
1370         return null;
1371       } else {
1372         return taskContainer.getId();
1373       }
1374     }
1375   }
1376 
1377   private class ScheduleStats {
1378     int numPendingReduces;
1379     int numScheduledMaps;
1380     int numScheduledReduces;
1381     int numAssignedMaps;
1382     int numAssignedReduces;
1383     int numCompletedMaps;
1384     int numCompletedReduces;
1385     int numContainersAllocated;
1386     int numContainersReleased;
1387 
updateAndLogIfChanged(String msgPrefix)1388     public void updateAndLogIfChanged(String msgPrefix) {
1389       boolean changed = false;
1390 
1391       // synchronized to fix findbug warnings
1392       synchronized (RMContainerAllocator.this) {
1393         changed |= (numPendingReduces != pendingReduces.size());
1394         numPendingReduces = pendingReduces.size();
1395         changed |= (numScheduledMaps != scheduledRequests.maps.size());
1396         numScheduledMaps = scheduledRequests.maps.size();
1397         changed |= (numScheduledReduces != scheduledRequests.reduces.size());
1398         numScheduledReduces = scheduledRequests.reduces.size();
1399         changed |= (numAssignedMaps != assignedRequests.maps.size());
1400         numAssignedMaps = assignedRequests.maps.size();
1401         changed |= (numAssignedReduces != assignedRequests.reduces.size());
1402         numAssignedReduces = assignedRequests.reduces.size();
1403         changed |= (numCompletedMaps != getJob().getCompletedMaps());
1404         numCompletedMaps = getJob().getCompletedMaps();
1405         changed |= (numCompletedReduces != getJob().getCompletedReduces());
1406         numCompletedReduces = getJob().getCompletedReduces();
1407         changed |= (numContainersAllocated != containersAllocated);
1408         numContainersAllocated = containersAllocated;
1409         changed |= (numContainersReleased != containersReleased);
1410         numContainersReleased = containersReleased;
1411       }
1412 
1413       if (changed) {
1414         log(msgPrefix);
1415       }
1416     }
1417 
log(String msgPrefix)1418     public void log(String msgPrefix) {
1419         LOG.info(msgPrefix + "PendingReds:" + numPendingReduces +
1420         " ScheduledMaps:" + numScheduledMaps +
1421         " ScheduledReds:" + numScheduledReduces +
1422         " AssignedMaps:" + numAssignedMaps +
1423         " AssignedReds:" + numAssignedReduces +
1424         " CompletedMaps:" + numCompletedMaps +
1425         " CompletedReds:" + numCompletedReduces +
1426         " ContAlloc:" + numContainersAllocated +
1427         " ContRel:" + numContainersReleased +
1428         " HostLocal:" + hostLocalAssigned +
1429         " RackLocal:" + rackLocalAssigned);
1430     }
1431   }
1432 }
1433