1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.mapreduce.v2.app.rm; 20 21 import java.io.IOException; 22 import java.util.ArrayList; 23 import java.util.Collections; 24 import java.util.Comparator; 25 import java.util.HashMap; 26 import java.util.HashSet; 27 import java.util.Iterator; 28 import java.util.LinkedHashMap; 29 import java.util.LinkedList; 30 import java.util.List; 31 import java.util.Map; 32 import java.util.Map.Entry; 33 import java.util.Set; 34 import java.util.concurrent.BlockingQueue; 35 import java.util.concurrent.LinkedBlockingQueue; 36 import java.util.concurrent.atomic.AtomicBoolean; 37 38 import org.apache.commons.logging.Log; 39 import org.apache.commons.logging.LogFactory; 40 import org.apache.hadoop.classification.InterfaceAudience.Private; 41 import org.apache.hadoop.conf.Configuration; 42 import org.apache.hadoop.io.Text; 43 import org.apache.hadoop.mapreduce.JobCounter; 44 import org.apache.hadoop.mapreduce.MRJobConfig; 45 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; 46 import org.apache.hadoop.mapreduce.jobhistory.NormalizedResourceEvent; 47 import org.apache.hadoop.mapreduce.v2.api.records.JobId; 48 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; 49 import org.apache.hadoop.mapreduce.v2.api.records.TaskType; 50 import org.apache.hadoop.mapreduce.v2.app.AppContext; 51 import org.apache.hadoop.mapreduce.v2.app.client.ClientService; 52 import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent; 53 import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent; 54 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; 55 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; 56 import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent; 57 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent; 58 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent; 59 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; 60 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; 61 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent; 62 import org.apache.hadoop.security.UserGroupInformation; 63 import org.apache.hadoop.util.StringInterner; 64 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; 65 import org.apache.hadoop.yarn.api.records.Container; 66 import org.apache.hadoop.yarn.api.records.ContainerExitStatus; 67 import org.apache.hadoop.yarn.api.records.ContainerId; 68 import org.apache.hadoop.yarn.api.records.ContainerStatus; 69 import org.apache.hadoop.yarn.api.records.NMToken; 70 import org.apache.hadoop.yarn.api.records.NodeId; 71 import org.apache.hadoop.yarn.api.records.NodeReport; 72 import org.apache.hadoop.yarn.api.records.NodeState; 73 import org.apache.hadoop.yarn.api.records.Priority; 74 import org.apache.hadoop.yarn.api.records.Resource; 75 import org.apache.hadoop.yarn.api.records.Token; 76 import org.apache.hadoop.yarn.client.ClientRMProxy; 77 import org.apache.hadoop.yarn.client.api.NMTokenCache; 78 import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException; 79 import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; 80 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; 81 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; 82 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; 83 import org.apache.hadoop.yarn.util.Clock; 84 import org.apache.hadoop.yarn.util.RackResolver; 85 import org.apache.hadoop.yarn.util.resource.Resources; 86 87 import com.google.common.annotations.VisibleForTesting; 88 89 /** 90 * Allocates the container from the ResourceManager scheduler. 91 */ 92 public class RMContainerAllocator extends RMContainerRequestor 93 implements ContainerAllocator { 94 95 static final Log LOG = LogFactory.getLog(RMContainerAllocator.class); 96 97 public static final 98 float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 0.05f; 99 100 static final Priority PRIORITY_FAST_FAIL_MAP; 101 static final Priority PRIORITY_REDUCE; 102 static final Priority PRIORITY_MAP; 103 104 @VisibleForTesting 105 public static final String RAMPDOWN_DIAGNOSTIC = "Reducer preempted " 106 + "to make room for pending map attempts"; 107 108 private Thread eventHandlingThread; 109 private final AtomicBoolean stopped; 110 111 static { 112 PRIORITY_FAST_FAIL_MAP = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Priority.class); 113 PRIORITY_FAST_FAIL_MAP.setPriority(5); 114 PRIORITY_REDUCE = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Priority.class); 115 PRIORITY_REDUCE.setPriority(10); 116 PRIORITY_MAP = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Priority.class); 117 PRIORITY_MAP.setPriority(20); 118 } 119 120 /* 121 Vocabulary Used: 122 pending -> requests which are NOT yet sent to RM 123 scheduled -> requests which are sent to RM but not yet assigned 124 assigned -> requests which are assigned to a container 125 completed -> request corresponding to which container has completed 126 127 Lifecycle of map 128 scheduled->assigned->completed 129 130 Lifecycle of reduce 131 pending->scheduled->assigned->completed 132 133 Maps are scheduled as soon as their requests are received. Reduces are 134 added to the pending and are ramped up (added to scheduled) based 135 on completed maps and current availability in the cluster. 136 */ 137 138 //reduces which are not yet scheduled 139 private final LinkedList<ContainerRequest> pendingReduces = 140 new LinkedList<ContainerRequest>(); 141 142 //holds information about the assigned containers to task attempts 143 private final AssignedRequests assignedRequests = new AssignedRequests(); 144 145 //holds scheduled requests to be fulfilled by RM 146 private final ScheduledRequests scheduledRequests = new ScheduledRequests(); 147 148 private int containersAllocated = 0; 149 private int containersReleased = 0; 150 private int hostLocalAssigned = 0; 151 private int rackLocalAssigned = 0; 152 private int lastCompletedTasks = 0; 153 154 private boolean recalculateReduceSchedule = false; 155 private Resource mapResourceRequest = Resources.none(); 156 private Resource reduceResourceRequest = Resources.none(); 157 158 private boolean reduceStarted = false; 159 private float maxReduceRampupLimit = 0; 160 private float maxReducePreemptionLimit = 0; 161 /** 162 * after this threshold, if the container request is not allocated, it is 163 * considered delayed. 164 */ 165 private long allocationDelayThresholdMs = 0; 166 private float reduceSlowStart = 0; 167 private int maxRunningMaps = 0; 168 private int maxRunningReduces = 0; 169 private long retryInterval; 170 private long retrystartTime; 171 private Clock clock; 172 173 @VisibleForTesting 174 protected BlockingQueue<ContainerAllocatorEvent> eventQueue 175 = new LinkedBlockingQueue<ContainerAllocatorEvent>(); 176 177 private ScheduleStats scheduleStats = new ScheduleStats(); 178 RMContainerAllocator(ClientService clientService, AppContext context)179 public RMContainerAllocator(ClientService clientService, AppContext context) { 180 super(clientService, context); 181 this.stopped = new AtomicBoolean(false); 182 this.clock = context.getClock(); 183 } 184 185 @Override serviceInit(Configuration conf)186 protected void serviceInit(Configuration conf) throws Exception { 187 super.serviceInit(conf); 188 reduceSlowStart = conf.getFloat( 189 MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 190 DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART); 191 maxReduceRampupLimit = conf.getFloat( 192 MRJobConfig.MR_AM_JOB_REDUCE_RAMPUP_UP_LIMIT, 193 MRJobConfig.DEFAULT_MR_AM_JOB_REDUCE_RAMP_UP_LIMIT); 194 maxReducePreemptionLimit = conf.getFloat( 195 MRJobConfig.MR_AM_JOB_REDUCE_PREEMPTION_LIMIT, 196 MRJobConfig.DEFAULT_MR_AM_JOB_REDUCE_PREEMPTION_LIMIT); 197 allocationDelayThresholdMs = conf.getInt( 198 MRJobConfig.MR_JOB_REDUCER_PREEMPT_DELAY_SEC, 199 MRJobConfig.DEFAULT_MR_JOB_REDUCER_PREEMPT_DELAY_SEC) * 1000;//sec -> ms 200 maxRunningMaps = conf.getInt(MRJobConfig.JOB_RUNNING_MAP_LIMIT, 201 MRJobConfig.DEFAULT_JOB_RUNNING_MAP_LIMIT); 202 maxRunningReduces = conf.getInt(MRJobConfig.JOB_RUNNING_REDUCE_LIMIT, 203 MRJobConfig.DEFAULT_JOB_RUNNING_REDUCE_LIMIT); 204 RackResolver.init(conf); 205 retryInterval = getConfig().getLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS, 206 MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS); 207 // Init startTime to current time. If all goes well, it will be reset after 208 // first attempt to contact RM. 209 retrystartTime = System.currentTimeMillis(); 210 } 211 212 @Override serviceStart()213 protected void serviceStart() throws Exception { 214 this.eventHandlingThread = new Thread() { 215 @SuppressWarnings("unchecked") 216 @Override 217 public void run() { 218 219 ContainerAllocatorEvent event; 220 221 while (!stopped.get() && !Thread.currentThread().isInterrupted()) { 222 try { 223 event = RMContainerAllocator.this.eventQueue.take(); 224 } catch (InterruptedException e) { 225 if (!stopped.get()) { 226 LOG.error("Returning, interrupted : " + e); 227 } 228 return; 229 } 230 231 try { 232 handleEvent(event); 233 } catch (Throwable t) { 234 LOG.error("Error in handling event type " + event.getType() 235 + " to the ContainreAllocator", t); 236 // Kill the AM 237 eventHandler.handle(new JobEvent(getJob().getID(), 238 JobEventType.INTERNAL_ERROR)); 239 return; 240 } 241 } 242 } 243 }; 244 this.eventHandlingThread.start(); 245 super.serviceStart(); 246 } 247 248 @Override heartbeat()249 protected synchronized void heartbeat() throws Exception { 250 scheduleStats.updateAndLogIfChanged("Before Scheduling: "); 251 List<Container> allocatedContainers = getResources(); 252 if (allocatedContainers != null && allocatedContainers.size() > 0) { 253 scheduledRequests.assign(allocatedContainers); 254 } 255 256 int completedMaps = getJob().getCompletedMaps(); 257 int completedTasks = completedMaps + getJob().getCompletedReduces(); 258 if ((lastCompletedTasks != completedTasks) || 259 (scheduledRequests.maps.size() > 0)) { 260 lastCompletedTasks = completedTasks; 261 recalculateReduceSchedule = true; 262 } 263 264 if (recalculateReduceSchedule) { 265 preemptReducesIfNeeded(); 266 scheduleReduces( 267 getJob().getTotalMaps(), completedMaps, 268 scheduledRequests.maps.size(), scheduledRequests.reduces.size(), 269 assignedRequests.maps.size(), assignedRequests.reduces.size(), 270 mapResourceRequest, reduceResourceRequest, 271 pendingReduces.size(), 272 maxReduceRampupLimit, reduceSlowStart); 273 recalculateReduceSchedule = false; 274 } 275 276 scheduleStats.updateAndLogIfChanged("After Scheduling: "); 277 } 278 279 @Override serviceStop()280 protected void serviceStop() throws Exception { 281 if (stopped.getAndSet(true)) { 282 // return if already stopped 283 return; 284 } 285 if (eventHandlingThread != null) { 286 eventHandlingThread.interrupt(); 287 } 288 super.serviceStop(); 289 scheduleStats.log("Final Stats: "); 290 } 291 292 @Private 293 @VisibleForTesting getAssignedRequests()294 AssignedRequests getAssignedRequests() { 295 return assignedRequests; 296 } 297 298 @Private 299 @VisibleForTesting getScheduledRequests()300 ScheduledRequests getScheduledRequests() { 301 return scheduledRequests; 302 } 303 getIsReduceStarted()304 public boolean getIsReduceStarted() { 305 return reduceStarted; 306 } 307 setIsReduceStarted(boolean reduceStarted)308 public void setIsReduceStarted(boolean reduceStarted) { 309 this.reduceStarted = reduceStarted; 310 } 311 312 @Override handle(ContainerAllocatorEvent event)313 public void handle(ContainerAllocatorEvent event) { 314 int qSize = eventQueue.size(); 315 if (qSize != 0 && qSize % 1000 == 0) { 316 LOG.info("Size of event-queue in RMContainerAllocator is " + qSize); 317 } 318 int remCapacity = eventQueue.remainingCapacity(); 319 if (remCapacity < 1000) { 320 LOG.warn("Very low remaining capacity in the event-queue " 321 + "of RMContainerAllocator: " + remCapacity); 322 } 323 try { 324 eventQueue.put(event); 325 } catch (InterruptedException e) { 326 throw new YarnRuntimeException(e); 327 } 328 } 329 330 @SuppressWarnings({ "unchecked" }) handleEvent(ContainerAllocatorEvent event)331 protected synchronized void handleEvent(ContainerAllocatorEvent event) { 332 recalculateReduceSchedule = true; 333 if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) { 334 ContainerRequestEvent reqEvent = (ContainerRequestEvent) event; 335 JobId jobId = getJob().getID(); 336 Resource supportedMaxContainerCapability = getMaxContainerCapability(); 337 if (reqEvent.getAttemptID().getTaskId().getTaskType().equals(TaskType.MAP)) { 338 if (mapResourceRequest.equals(Resources.none())) { 339 mapResourceRequest = reqEvent.getCapability(); 340 eventHandler.handle(new JobHistoryEvent(jobId, 341 new NormalizedResourceEvent( 342 org.apache.hadoop.mapreduce.TaskType.MAP, mapResourceRequest 343 .getMemory()))); 344 LOG.info("mapResourceRequest:" + mapResourceRequest); 345 if (mapResourceRequest.getMemory() > supportedMaxContainerCapability 346 .getMemory() 347 || mapResourceRequest.getVirtualCores() > supportedMaxContainerCapability 348 .getVirtualCores()) { 349 String diagMsg = 350 "MAP capability required is more than the supported " 351 + "max container capability in the cluster. Killing the Job. mapResourceRequest: " 352 + mapResourceRequest + " maxContainerCapability:" 353 + supportedMaxContainerCapability; 354 LOG.info(diagMsg); 355 eventHandler.handle(new JobDiagnosticsUpdateEvent(jobId, diagMsg)); 356 eventHandler.handle(new JobEvent(jobId, JobEventType.JOB_KILL)); 357 } 358 } 359 // set the resources 360 reqEvent.getCapability().setMemory(mapResourceRequest.getMemory()); 361 reqEvent.getCapability().setVirtualCores( 362 mapResourceRequest.getVirtualCores()); 363 scheduledRequests.addMap(reqEvent);//maps are immediately scheduled 364 } else { 365 if (reduceResourceRequest.equals(Resources.none())) { 366 reduceResourceRequest = reqEvent.getCapability(); 367 eventHandler.handle(new JobHistoryEvent(jobId, 368 new NormalizedResourceEvent( 369 org.apache.hadoop.mapreduce.TaskType.REDUCE, 370 reduceResourceRequest.getMemory()))); 371 LOG.info("reduceResourceRequest:" + reduceResourceRequest); 372 if (reduceResourceRequest.getMemory() > supportedMaxContainerCapability 373 .getMemory() 374 || reduceResourceRequest.getVirtualCores() > supportedMaxContainerCapability 375 .getVirtualCores()) { 376 String diagMsg = 377 "REDUCE capability required is more than the " 378 + "supported max container capability in the cluster. Killing the " 379 + "Job. reduceResourceRequest: " + reduceResourceRequest 380 + " maxContainerCapability:" 381 + supportedMaxContainerCapability; 382 LOG.info(diagMsg); 383 eventHandler.handle(new JobDiagnosticsUpdateEvent(jobId, diagMsg)); 384 eventHandler.handle(new JobEvent(jobId, JobEventType.JOB_KILL)); 385 } 386 } 387 // set the resources 388 reqEvent.getCapability().setMemory(reduceResourceRequest.getMemory()); 389 reqEvent.getCapability().setVirtualCores( 390 reduceResourceRequest.getVirtualCores()); 391 if (reqEvent.getEarlierAttemptFailed()) { 392 //add to the front of queue for fail fast 393 pendingReduces.addFirst(new ContainerRequest(reqEvent, PRIORITY_REDUCE)); 394 } else { 395 pendingReduces.add(new ContainerRequest(reqEvent, PRIORITY_REDUCE)); 396 //reduces are added to pending and are slowly ramped up 397 } 398 } 399 400 } else if ( 401 event.getType() == ContainerAllocator.EventType.CONTAINER_DEALLOCATE) { 402 403 LOG.info("Processing the event " + event.toString()); 404 405 TaskAttemptId aId = event.getAttemptID(); 406 407 boolean removed = scheduledRequests.remove(aId); 408 if (!removed) { 409 ContainerId containerId = assignedRequests.get(aId); 410 if (containerId != null) { 411 removed = true; 412 assignedRequests.remove(aId); 413 containersReleased++; 414 pendingRelease.add(containerId); 415 release(containerId); 416 } 417 } 418 if (!removed) { 419 LOG.error("Could not deallocate container for task attemptId " + 420 aId); 421 } 422 } else if ( 423 event.getType() == ContainerAllocator.EventType.CONTAINER_FAILED) { 424 ContainerFailedEvent fEv = (ContainerFailedEvent) event; 425 String host = getHost(fEv.getContMgrAddress()); 426 containerFailedOnHost(host); 427 } 428 } 429 getHost(String contMgrAddress)430 private static String getHost(String contMgrAddress) { 431 String host = contMgrAddress; 432 String[] hostport = host.split(":"); 433 if (hostport.length == 2) { 434 host = hostport[0]; 435 } 436 return host; 437 } 438 439 @Private 440 @VisibleForTesting setReduceResourceRequest(Resource res)441 synchronized void setReduceResourceRequest(Resource res) { 442 this.reduceResourceRequest = res; 443 } 444 445 @Private 446 @VisibleForTesting setMapResourceRequest(Resource res)447 synchronized void setMapResourceRequest(Resource res) { 448 this.mapResourceRequest = res; 449 } 450 451 @Private 452 @VisibleForTesting preemptReducesIfNeeded()453 void preemptReducesIfNeeded() { 454 if (reduceResourceRequest.equals(Resources.none())) { 455 return; // no reduces 456 } 457 //check if reduces have taken over the whole cluster and there are 458 //unassigned maps 459 if (scheduledRequests.maps.size() > 0) { 460 Resource resourceLimit = getResourceLimit(); 461 Resource availableResourceForMap = 462 Resources.subtract( 463 resourceLimit, 464 Resources.multiply(reduceResourceRequest, 465 assignedRequests.reduces.size() 466 - assignedRequests.preemptionWaitingReduces.size())); 467 // availableMemForMap must be sufficient to run at least 1 map 468 if (ResourceCalculatorUtils.computeAvailableContainers(availableResourceForMap, 469 mapResourceRequest, getSchedulerResourceTypes()) <= 0) { 470 // to make sure new containers are given to maps and not reduces 471 // ramp down all scheduled reduces if any 472 // (since reduces are scheduled at higher priority than maps) 473 LOG.info("Ramping down all scheduled reduces:" 474 + scheduledRequests.reduces.size()); 475 for (ContainerRequest req : scheduledRequests.reduces.values()) { 476 pendingReduces.add(req); 477 } 478 scheduledRequests.reduces.clear(); 479 480 //do further checking to find the number of map requests that were 481 //hanging around for a while 482 int hangingMapRequests = getNumOfHangingRequests(scheduledRequests.maps); 483 if (hangingMapRequests > 0) { 484 // preempt for making space for at least one map 485 int preemptionReduceNumForOneMap = 486 ResourceCalculatorUtils.divideAndCeilContainers(mapResourceRequest, 487 reduceResourceRequest, getSchedulerResourceTypes()); 488 int preemptionReduceNumForPreemptionLimit = 489 ResourceCalculatorUtils.divideAndCeilContainers( 490 Resources.multiply(resourceLimit, maxReducePreemptionLimit), 491 reduceResourceRequest, getSchedulerResourceTypes()); 492 int preemptionReduceNumForAllMaps = 493 ResourceCalculatorUtils.divideAndCeilContainers( 494 Resources.multiply(mapResourceRequest, hangingMapRequests), 495 reduceResourceRequest, getSchedulerResourceTypes()); 496 int toPreempt = 497 Math.min(Math.max(preemptionReduceNumForOneMap, 498 preemptionReduceNumForPreemptionLimit), 499 preemptionReduceNumForAllMaps); 500 501 LOG.info("Going to preempt " + toPreempt 502 + " due to lack of space for maps"); 503 assignedRequests.preemptReduce(toPreempt); 504 } 505 } 506 } 507 } 508 getNumOfHangingRequests(Map<TaskAttemptId, ContainerRequest> requestMap)509 private int getNumOfHangingRequests(Map<TaskAttemptId, ContainerRequest> requestMap) { 510 if (allocationDelayThresholdMs <= 0) 511 return requestMap.size(); 512 int hangingRequests = 0; 513 long currTime = clock.getTime(); 514 for (ContainerRequest request: requestMap.values()) { 515 long delay = currTime - request.requestTimeMs; 516 if (delay > allocationDelayThresholdMs) 517 hangingRequests++; 518 } 519 return hangingRequests; 520 } 521 522 @Private scheduleReduces( int totalMaps, int completedMaps, int scheduledMaps, int scheduledReduces, int assignedMaps, int assignedReduces, Resource mapResourceReqt, Resource reduceResourceReqt, int numPendingReduces, float maxReduceRampupLimit, float reduceSlowStart)523 public void scheduleReduces( 524 int totalMaps, int completedMaps, 525 int scheduledMaps, int scheduledReduces, 526 int assignedMaps, int assignedReduces, 527 Resource mapResourceReqt, Resource reduceResourceReqt, 528 int numPendingReduces, 529 float maxReduceRampupLimit, float reduceSlowStart) { 530 531 if (numPendingReduces == 0) { 532 return; 533 } 534 535 // get available resources for this job 536 Resource headRoom = getAvailableResources(); 537 if (headRoom == null) { 538 headRoom = Resources.none(); 539 } 540 541 LOG.info("Recalculating schedule, headroom=" + headRoom); 542 543 //check for slow start 544 if (!getIsReduceStarted()) {//not set yet 545 int completedMapsForReduceSlowstart = (int)Math.ceil(reduceSlowStart * 546 totalMaps); 547 if(completedMaps < completedMapsForReduceSlowstart) { 548 LOG.info("Reduce slow start threshold not met. " + 549 "completedMapsForReduceSlowstart " + 550 completedMapsForReduceSlowstart); 551 return; 552 } else { 553 LOG.info("Reduce slow start threshold reached. Scheduling reduces."); 554 setIsReduceStarted(true); 555 } 556 } 557 558 //if all maps are assigned, then ramp up all reduces irrespective of the 559 //headroom 560 if (scheduledMaps == 0 && numPendingReduces > 0) { 561 LOG.info("All maps assigned. " + 562 "Ramping up all remaining reduces:" + numPendingReduces); 563 scheduleAllReduces(); 564 return; 565 } 566 567 float completedMapPercent = 0f; 568 if (totalMaps != 0) {//support for 0 maps 569 completedMapPercent = (float)completedMaps/totalMaps; 570 } else { 571 completedMapPercent = 1; 572 } 573 574 Resource netScheduledMapResource = 575 Resources.multiply(mapResourceReqt, (scheduledMaps + assignedMaps)); 576 577 Resource netScheduledReduceResource = 578 Resources.multiply(reduceResourceReqt, 579 (scheduledReduces + assignedReduces)); 580 581 Resource finalMapResourceLimit; 582 Resource finalReduceResourceLimit; 583 584 // ramp up the reduces based on completed map percentage 585 Resource totalResourceLimit = getResourceLimit(); 586 587 Resource idealReduceResourceLimit = 588 Resources.multiply(totalResourceLimit, 589 Math.min(completedMapPercent, maxReduceRampupLimit)); 590 Resource ideaMapResourceLimit = 591 Resources.subtract(totalResourceLimit, idealReduceResourceLimit); 592 593 // check if there aren't enough maps scheduled, give the free map capacity 594 // to reduce. 595 // Even when container number equals, there may be unused resources in one 596 // dimension 597 if (ResourceCalculatorUtils.computeAvailableContainers(ideaMapResourceLimit, 598 mapResourceReqt, getSchedulerResourceTypes()) >= (scheduledMaps + assignedMaps)) { 599 // enough resource given to maps, given the remaining to reduces 600 Resource unusedMapResourceLimit = 601 Resources.subtract(ideaMapResourceLimit, netScheduledMapResource); 602 finalReduceResourceLimit = 603 Resources.add(idealReduceResourceLimit, unusedMapResourceLimit); 604 finalMapResourceLimit = 605 Resources.subtract(totalResourceLimit, finalReduceResourceLimit); 606 } else { 607 finalMapResourceLimit = ideaMapResourceLimit; 608 finalReduceResourceLimit = idealReduceResourceLimit; 609 } 610 611 LOG.info("completedMapPercent " + completedMapPercent 612 + " totalResourceLimit:" + totalResourceLimit 613 + " finalMapResourceLimit:" + finalMapResourceLimit 614 + " finalReduceResourceLimit:" + finalReduceResourceLimit 615 + " netScheduledMapResource:" + netScheduledMapResource 616 + " netScheduledReduceResource:" + netScheduledReduceResource); 617 618 int rampUp = 619 ResourceCalculatorUtils.computeAvailableContainers(Resources.subtract( 620 finalReduceResourceLimit, netScheduledReduceResource), 621 reduceResourceReqt, getSchedulerResourceTypes()); 622 623 if (rampUp > 0) { 624 rampUp = Math.min(rampUp, numPendingReduces); 625 LOG.info("Ramping up " + rampUp); 626 rampUpReduces(rampUp); 627 } else if (rampUp < 0) { 628 int rampDown = -1 * rampUp; 629 rampDown = Math.min(rampDown, scheduledReduces); 630 LOG.info("Ramping down " + rampDown); 631 rampDownReduces(rampDown); 632 } 633 } 634 635 @Private scheduleAllReduces()636 public void scheduleAllReduces() { 637 for (ContainerRequest req : pendingReduces) { 638 scheduledRequests.addReduce(req); 639 } 640 pendingReduces.clear(); 641 } 642 643 @Private rampUpReduces(int rampUp)644 public void rampUpReduces(int rampUp) { 645 //more reduce to be scheduled 646 for (int i = 0; i < rampUp; i++) { 647 ContainerRequest request = pendingReduces.removeFirst(); 648 scheduledRequests.addReduce(request); 649 } 650 } 651 652 @Private rampDownReduces(int rampDown)653 public void rampDownReduces(int rampDown) { 654 //remove from the scheduled and move back to pending 655 for (int i = 0; i < rampDown; i++) { 656 ContainerRequest request = scheduledRequests.removeReduce(); 657 pendingReduces.add(request); 658 } 659 } 660 661 @SuppressWarnings("unchecked") getResources()662 private List<Container> getResources() throws Exception { 663 applyConcurrentTaskLimits(); 664 665 // will be null the first time 666 Resource headRoom = 667 getAvailableResources() == null ? Resources.none() : 668 Resources.clone(getAvailableResources()); 669 AllocateResponse response; 670 /* 671 * If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS 672 * milliseconds before aborting. During this interval, AM will still try 673 * to contact the RM. 674 */ 675 try { 676 response = makeRemoteRequest(); 677 // Reset retry count if no exception occurred. 678 retrystartTime = System.currentTimeMillis(); 679 } catch (ApplicationAttemptNotFoundException e ) { 680 // This can happen if the RM has been restarted. If it is in that state, 681 // this application must clean itself up. 682 eventHandler.handle(new JobEvent(this.getJob().getID(), 683 JobEventType.JOB_AM_REBOOT)); 684 throw new RMContainerAllocationException( 685 "Resource Manager doesn't recognize AttemptId: " 686 + this.getContext().getApplicationAttemptId(), e); 687 } catch (ApplicationMasterNotRegisteredException e) { 688 LOG.info("ApplicationMaster is out of sync with ResourceManager," 689 + " hence resync and send outstanding requests."); 690 // RM may have restarted, re-register with RM. 691 lastResponseID = 0; 692 register(); 693 addOutstandingRequestOnResync(); 694 return null; 695 } catch (Exception e) { 696 // This can happen when the connection to the RM has gone down. Keep 697 // re-trying until the retryInterval has expired. 698 if (System.currentTimeMillis() - retrystartTime >= retryInterval) { 699 LOG.error("Could not contact RM after " + retryInterval + " milliseconds."); 700 eventHandler.handle(new JobEvent(this.getJob().getID(), 701 JobEventType.JOB_AM_REBOOT)); 702 throw new RMContainerAllocationException("Could not contact RM after " + 703 retryInterval + " milliseconds."); 704 } 705 // Throw this up to the caller, which may decide to ignore it and 706 // continue to attempt to contact the RM. 707 throw e; 708 } 709 Resource newHeadRoom = 710 getAvailableResources() == null ? Resources.none() 711 : getAvailableResources(); 712 List<Container> newContainers = response.getAllocatedContainers(); 713 // Setting NMTokens 714 if (response.getNMTokens() != null) { 715 for (NMToken nmToken : response.getNMTokens()) { 716 NMTokenCache.setNMToken(nmToken.getNodeId().toString(), 717 nmToken.getToken()); 718 } 719 } 720 721 // Setting AMRMToken 722 if (response.getAMRMToken() != null) { 723 updateAMRMToken(response.getAMRMToken()); 724 } 725 726 List<ContainerStatus> finishedContainers = response.getCompletedContainersStatuses(); 727 if (newContainers.size() + finishedContainers.size() > 0 728 || !headRoom.equals(newHeadRoom)) { 729 //something changed 730 recalculateReduceSchedule = true; 731 if (LOG.isDebugEnabled() && !headRoom.equals(newHeadRoom)) { 732 LOG.debug("headroom=" + newHeadRoom); 733 } 734 } 735 736 if (LOG.isDebugEnabled()) { 737 for (Container cont : newContainers) { 738 LOG.debug("Received new Container :" + cont); 739 } 740 } 741 742 //Called on each allocation. Will know about newly blacklisted/added hosts. 743 computeIgnoreBlacklisting(); 744 745 handleUpdatedNodes(response); 746 747 for (ContainerStatus cont : finishedContainers) { 748 LOG.info("Received completed container " + cont.getContainerId()); 749 TaskAttemptId attemptID = assignedRequests.get(cont.getContainerId()); 750 if (attemptID == null) { 751 LOG.error("Container complete event for unknown container id " 752 + cont.getContainerId()); 753 } else { 754 pendingRelease.remove(cont.getContainerId()); 755 assignedRequests.remove(attemptID); 756 757 // send the container completed event to Task attempt 758 eventHandler.handle(createContainerFinishedEvent(cont, attemptID)); 759 760 // Send the diagnostics 761 String diagnostics = StringInterner.weakIntern(cont.getDiagnostics()); 762 eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptID, 763 diagnostics)); 764 } 765 } 766 return newContainers; 767 } 768 applyConcurrentTaskLimits()769 private void applyConcurrentTaskLimits() { 770 int numScheduledMaps = scheduledRequests.maps.size(); 771 if (maxRunningMaps > 0 && numScheduledMaps > 0) { 772 int maxRequestedMaps = Math.max(0, 773 maxRunningMaps - assignedRequests.maps.size()); 774 int numScheduledFailMaps = scheduledRequests.earlierFailedMaps.size(); 775 int failedMapRequestLimit = Math.min(maxRequestedMaps, 776 numScheduledFailMaps); 777 int normalMapRequestLimit = Math.min( 778 maxRequestedMaps - failedMapRequestLimit, 779 numScheduledMaps - numScheduledFailMaps); 780 setRequestLimit(PRIORITY_FAST_FAIL_MAP, mapResourceRequest, 781 failedMapRequestLimit); 782 setRequestLimit(PRIORITY_MAP, mapResourceRequest, normalMapRequestLimit); 783 } 784 785 int numScheduledReduces = scheduledRequests.reduces.size(); 786 if (maxRunningReduces > 0 && numScheduledReduces > 0) { 787 int maxRequestedReduces = Math.max(0, 788 maxRunningReduces - assignedRequests.reduces.size()); 789 int reduceRequestLimit = Math.min(maxRequestedReduces, 790 numScheduledReduces); 791 setRequestLimit(PRIORITY_REDUCE, reduceResourceRequest, 792 reduceRequestLimit); 793 } 794 } 795 canAssignMaps()796 private boolean canAssignMaps() { 797 return (maxRunningMaps <= 0 798 || assignedRequests.maps.size() < maxRunningMaps); 799 } 800 canAssignReduces()801 private boolean canAssignReduces() { 802 return (maxRunningReduces <= 0 803 || assignedRequests.reduces.size() < maxRunningReduces); 804 } 805 updateAMRMToken(Token token)806 private void updateAMRMToken(Token token) throws IOException { 807 org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken = 808 new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>(token 809 .getIdentifier().array(), token.getPassword().array(), new Text( 810 token.getKind()), new Text(token.getService())); 811 UserGroupInformation currentUGI = UserGroupInformation.getCurrentUser(); 812 currentUGI.addToken(amrmToken); 813 amrmToken.setService(ClientRMProxy.getAMRMTokenService(getConfig())); 814 } 815 816 @VisibleForTesting createContainerFinishedEvent(ContainerStatus cont, TaskAttemptId attemptID)817 public TaskAttemptEvent createContainerFinishedEvent(ContainerStatus cont, 818 TaskAttemptId attemptID) { 819 if (cont.getExitStatus() == ContainerExitStatus.ABORTED 820 || cont.getExitStatus() == ContainerExitStatus.PREEMPTED) { 821 // killed by framework 822 return new TaskAttemptEvent(attemptID, 823 TaskAttemptEventType.TA_KILL); 824 } else { 825 return new TaskAttemptEvent(attemptID, 826 TaskAttemptEventType.TA_CONTAINER_COMPLETED); 827 } 828 } 829 830 @SuppressWarnings("unchecked") handleUpdatedNodes(AllocateResponse response)831 private void handleUpdatedNodes(AllocateResponse response) { 832 // send event to the job about on updated nodes 833 List<NodeReport> updatedNodes = response.getUpdatedNodes(); 834 if (!updatedNodes.isEmpty()) { 835 836 // send event to the job to act upon completed tasks 837 eventHandler.handle(new JobUpdatedNodesEvent(getJob().getID(), 838 updatedNodes)); 839 840 // act upon running tasks 841 HashSet<NodeId> unusableNodes = new HashSet<NodeId>(); 842 for (NodeReport nr : updatedNodes) { 843 NodeState nodeState = nr.getNodeState(); 844 if (nodeState.isUnusable()) { 845 unusableNodes.add(nr.getNodeId()); 846 } 847 } 848 for (int i = 0; i < 2; ++i) { 849 HashMap<TaskAttemptId, Container> taskSet = i == 0 ? assignedRequests.maps 850 : assignedRequests.reduces; 851 // kill running containers 852 for (Map.Entry<TaskAttemptId, Container> entry : taskSet.entrySet()) { 853 TaskAttemptId tid = entry.getKey(); 854 NodeId taskAttemptNodeId = entry.getValue().getNodeId(); 855 if (unusableNodes.contains(taskAttemptNodeId)) { 856 LOG.info("Killing taskAttempt:" + tid 857 + " because it is running on unusable node:" 858 + taskAttemptNodeId); 859 eventHandler.handle(new TaskAttemptKillEvent(tid, 860 "TaskAttempt killed because it ran on unusable node" 861 + taskAttemptNodeId)); 862 } 863 } 864 } 865 } 866 } 867 868 @Private getResourceLimit()869 public Resource getResourceLimit() { 870 Resource headRoom = getAvailableResources(); 871 if (headRoom == null) { 872 headRoom = Resources.none(); 873 } 874 Resource assignedMapResource = 875 Resources.multiply(mapResourceRequest, assignedRequests.maps.size()); 876 Resource assignedReduceResource = 877 Resources.multiply(reduceResourceRequest, 878 assignedRequests.reduces.size()); 879 return Resources.add(headRoom, 880 Resources.add(assignedMapResource, assignedReduceResource)); 881 } 882 883 @Private 884 @VisibleForTesting 885 class ScheduledRequests { 886 887 private final LinkedList<TaskAttemptId> earlierFailedMaps = 888 new LinkedList<TaskAttemptId>(); 889 890 /** Maps from a host to a list of Map tasks with data on the host */ 891 private final Map<String, LinkedList<TaskAttemptId>> mapsHostMapping = 892 new HashMap<String, LinkedList<TaskAttemptId>>(); 893 private final Map<String, LinkedList<TaskAttemptId>> mapsRackMapping = 894 new HashMap<String, LinkedList<TaskAttemptId>>(); 895 @VisibleForTesting 896 final Map<TaskAttemptId, ContainerRequest> maps = 897 new LinkedHashMap<TaskAttemptId, ContainerRequest>(); 898 899 private final LinkedHashMap<TaskAttemptId, ContainerRequest> reduces = 900 new LinkedHashMap<TaskAttemptId, ContainerRequest>(); 901 remove(TaskAttemptId tId)902 boolean remove(TaskAttemptId tId) { 903 ContainerRequest req = null; 904 if (tId.getTaskId().getTaskType().equals(TaskType.MAP)) { 905 req = maps.remove(tId); 906 } else { 907 req = reduces.remove(tId); 908 } 909 910 if (req == null) { 911 return false; 912 } else { 913 decContainerReq(req); 914 return true; 915 } 916 } 917 removeReduce()918 ContainerRequest removeReduce() { 919 Iterator<Entry<TaskAttemptId, ContainerRequest>> it = reduces.entrySet().iterator(); 920 if (it.hasNext()) { 921 Entry<TaskAttemptId, ContainerRequest> entry = it.next(); 922 it.remove(); 923 decContainerReq(entry.getValue()); 924 return entry.getValue(); 925 } 926 return null; 927 } 928 addMap(ContainerRequestEvent event)929 void addMap(ContainerRequestEvent event) { 930 ContainerRequest request = null; 931 932 if (event.getEarlierAttemptFailed()) { 933 earlierFailedMaps.add(event.getAttemptID()); 934 request = new ContainerRequest(event, PRIORITY_FAST_FAIL_MAP); 935 LOG.info("Added "+event.getAttemptID()+" to list of failed maps"); 936 } else { 937 for (String host : event.getHosts()) { 938 LinkedList<TaskAttemptId> list = mapsHostMapping.get(host); 939 if (list == null) { 940 list = new LinkedList<TaskAttemptId>(); 941 mapsHostMapping.put(host, list); 942 } 943 list.add(event.getAttemptID()); 944 if (LOG.isDebugEnabled()) { 945 LOG.debug("Added attempt req to host " + host); 946 } 947 } 948 for (String rack: event.getRacks()) { 949 LinkedList<TaskAttemptId> list = mapsRackMapping.get(rack); 950 if (list == null) { 951 list = new LinkedList<TaskAttemptId>(); 952 mapsRackMapping.put(rack, list); 953 } 954 list.add(event.getAttemptID()); 955 if (LOG.isDebugEnabled()) { 956 LOG.debug("Added attempt req to rack " + rack); 957 } 958 } 959 request = new ContainerRequest(event, PRIORITY_MAP); 960 } 961 maps.put(event.getAttemptID(), request); 962 addContainerReq(request); 963 } 964 965 addReduce(ContainerRequest req)966 void addReduce(ContainerRequest req) { 967 reduces.put(req.attemptID, req); 968 addContainerReq(req); 969 } 970 971 // this method will change the list of allocatedContainers. assign(List<Container> allocatedContainers)972 private void assign(List<Container> allocatedContainers) { 973 Iterator<Container> it = allocatedContainers.iterator(); 974 LOG.info("Got allocated containers " + allocatedContainers.size()); 975 containersAllocated += allocatedContainers.size(); 976 while (it.hasNext()) { 977 Container allocated = it.next(); 978 if (LOG.isDebugEnabled()) { 979 LOG.debug("Assigning container " + allocated.getId() 980 + " with priority " + allocated.getPriority() + " to NM " 981 + allocated.getNodeId()); 982 } 983 984 // check if allocated container meets memory requirements 985 // and whether we have any scheduled tasks that need 986 // a container to be assigned 987 boolean isAssignable = true; 988 Priority priority = allocated.getPriority(); 989 Resource allocatedResource = allocated.getResource(); 990 if (PRIORITY_FAST_FAIL_MAP.equals(priority) 991 || PRIORITY_MAP.equals(priority)) { 992 if (ResourceCalculatorUtils.computeAvailableContainers(allocatedResource, 993 mapResourceRequest, getSchedulerResourceTypes()) <= 0 994 || maps.isEmpty()) { 995 LOG.info("Cannot assign container " + allocated 996 + " for a map as either " 997 + " container memory less than required " + mapResourceRequest 998 + " or no pending map tasks - maps.isEmpty=" 999 + maps.isEmpty()); 1000 isAssignable = false; 1001 } 1002 } 1003 else if (PRIORITY_REDUCE.equals(priority)) { 1004 if (ResourceCalculatorUtils.computeAvailableContainers(allocatedResource, 1005 reduceResourceRequest, getSchedulerResourceTypes()) <= 0 1006 || reduces.isEmpty()) { 1007 LOG.info("Cannot assign container " + allocated 1008 + " for a reduce as either " 1009 + " container memory less than required " + reduceResourceRequest 1010 + " or no pending reduce tasks - reduces.isEmpty=" 1011 + reduces.isEmpty()); 1012 isAssignable = false; 1013 } 1014 } else { 1015 LOG.warn("Container allocated at unwanted priority: " + priority + 1016 ". Returning to RM..."); 1017 isAssignable = false; 1018 } 1019 1020 if(!isAssignable) { 1021 // release container if we could not assign it 1022 containerNotAssigned(allocated); 1023 it.remove(); 1024 continue; 1025 } 1026 1027 // do not assign if allocated container is on a 1028 // blacklisted host 1029 String allocatedHost = allocated.getNodeId().getHost(); 1030 if (isNodeBlacklisted(allocatedHost)) { 1031 // we need to request for a new container 1032 // and release the current one 1033 LOG.info("Got allocated container on a blacklisted " 1034 + " host "+allocatedHost 1035 +". Releasing container " + allocated); 1036 1037 // find the request matching this allocated container 1038 // and replace it with a new one 1039 ContainerRequest toBeReplacedReq = 1040 getContainerReqToReplace(allocated); 1041 if (toBeReplacedReq != null) { 1042 LOG.info("Placing a new container request for task attempt " 1043 + toBeReplacedReq.attemptID); 1044 ContainerRequest newReq = 1045 getFilteredContainerRequest(toBeReplacedReq); 1046 decContainerReq(toBeReplacedReq); 1047 if (toBeReplacedReq.attemptID.getTaskId().getTaskType() == 1048 TaskType.MAP) { 1049 maps.put(newReq.attemptID, newReq); 1050 } 1051 else { 1052 reduces.put(newReq.attemptID, newReq); 1053 } 1054 addContainerReq(newReq); 1055 } 1056 else { 1057 LOG.info("Could not map allocated container to a valid request." 1058 + " Releasing allocated container " + allocated); 1059 } 1060 1061 // release container if we could not assign it 1062 containerNotAssigned(allocated); 1063 it.remove(); 1064 continue; 1065 } 1066 } 1067 1068 assignContainers(allocatedContainers); 1069 1070 // release container if we could not assign it 1071 it = allocatedContainers.iterator(); 1072 while (it.hasNext()) { 1073 Container allocated = it.next(); 1074 LOG.info("Releasing unassigned container " + allocated); 1075 containerNotAssigned(allocated); 1076 } 1077 } 1078 1079 @SuppressWarnings("unchecked") containerAssigned(Container allocated, ContainerRequest assigned)1080 private void containerAssigned(Container allocated, 1081 ContainerRequest assigned) { 1082 // Update resource requests 1083 decContainerReq(assigned); 1084 1085 // send the container-assigned event to task attempt 1086 eventHandler.handle(new TaskAttemptContainerAssignedEvent( 1087 assigned.attemptID, allocated, applicationACLs)); 1088 1089 assignedRequests.add(allocated, assigned.attemptID); 1090 1091 if (LOG.isDebugEnabled()) { 1092 LOG.info("Assigned container (" + allocated + ") " 1093 + " to task " + assigned.attemptID + " on node " 1094 + allocated.getNodeId().toString()); 1095 } 1096 } 1097 containerNotAssigned(Container allocated)1098 private void containerNotAssigned(Container allocated) { 1099 containersReleased++; 1100 pendingRelease.add(allocated.getId()); 1101 release(allocated.getId()); 1102 } 1103 assignWithoutLocality(Container allocated)1104 private ContainerRequest assignWithoutLocality(Container allocated) { 1105 ContainerRequest assigned = null; 1106 1107 Priority priority = allocated.getPriority(); 1108 if (PRIORITY_FAST_FAIL_MAP.equals(priority)) { 1109 LOG.info("Assigning container " + allocated + " to fast fail map"); 1110 assigned = assignToFailedMap(allocated); 1111 } else if (PRIORITY_REDUCE.equals(priority)) { 1112 if (LOG.isDebugEnabled()) { 1113 LOG.debug("Assigning container " + allocated + " to reduce"); 1114 } 1115 assigned = assignToReduce(allocated); 1116 } 1117 1118 return assigned; 1119 } 1120 assignContainers(List<Container> allocatedContainers)1121 private void assignContainers(List<Container> allocatedContainers) { 1122 Iterator<Container> it = allocatedContainers.iterator(); 1123 while (it.hasNext()) { 1124 Container allocated = it.next(); 1125 ContainerRequest assigned = assignWithoutLocality(allocated); 1126 if (assigned != null) { 1127 containerAssigned(allocated, assigned); 1128 it.remove(); 1129 } 1130 } 1131 1132 assignMapsWithLocality(allocatedContainers); 1133 } 1134 getContainerReqToReplace(Container allocated)1135 private ContainerRequest getContainerReqToReplace(Container allocated) { 1136 LOG.info("Finding containerReq for allocated container: " + allocated); 1137 Priority priority = allocated.getPriority(); 1138 ContainerRequest toBeReplaced = null; 1139 if (PRIORITY_FAST_FAIL_MAP.equals(priority)) { 1140 LOG.info("Replacing FAST_FAIL_MAP container " + allocated.getId()); 1141 Iterator<TaskAttemptId> iter = earlierFailedMaps.iterator(); 1142 while (toBeReplaced == null && iter.hasNext()) { 1143 toBeReplaced = maps.get(iter.next()); 1144 } 1145 LOG.info("Found replacement: " + toBeReplaced); 1146 return toBeReplaced; 1147 } 1148 else if (PRIORITY_MAP.equals(priority)) { 1149 LOG.info("Replacing MAP container " + allocated.getId()); 1150 // allocated container was for a map 1151 String host = allocated.getNodeId().getHost(); 1152 LinkedList<TaskAttemptId> list = mapsHostMapping.get(host); 1153 if (list != null && list.size() > 0) { 1154 TaskAttemptId tId = list.removeLast(); 1155 if (maps.containsKey(tId)) { 1156 toBeReplaced = maps.remove(tId); 1157 } 1158 } 1159 else { 1160 TaskAttemptId tId = maps.keySet().iterator().next(); 1161 toBeReplaced = maps.remove(tId); 1162 } 1163 } 1164 else if (PRIORITY_REDUCE.equals(priority)) { 1165 TaskAttemptId tId = reduces.keySet().iterator().next(); 1166 toBeReplaced = reduces.remove(tId); 1167 } 1168 LOG.info("Found replacement: " + toBeReplaced); 1169 return toBeReplaced; 1170 } 1171 1172 1173 @SuppressWarnings("unchecked") assignToFailedMap(Container allocated)1174 private ContainerRequest assignToFailedMap(Container allocated) { 1175 //try to assign to earlierFailedMaps if present 1176 ContainerRequest assigned = null; 1177 while (assigned == null && earlierFailedMaps.size() > 0 1178 && canAssignMaps()) { 1179 TaskAttemptId tId = earlierFailedMaps.removeFirst(); 1180 if (maps.containsKey(tId)) { 1181 assigned = maps.remove(tId); 1182 JobCounterUpdateEvent jce = 1183 new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId()); 1184 jce.addCounterUpdate(JobCounter.OTHER_LOCAL_MAPS, 1); 1185 eventHandler.handle(jce); 1186 LOG.info("Assigned from earlierFailedMaps"); 1187 break; 1188 } 1189 } 1190 return assigned; 1191 } 1192 assignToReduce(Container allocated)1193 private ContainerRequest assignToReduce(Container allocated) { 1194 ContainerRequest assigned = null; 1195 //try to assign to reduces if present 1196 if (assigned == null && reduces.size() > 0 && canAssignReduces()) { 1197 TaskAttemptId tId = reduces.keySet().iterator().next(); 1198 assigned = reduces.remove(tId); 1199 LOG.info("Assigned to reduce"); 1200 } 1201 return assigned; 1202 } 1203 1204 @SuppressWarnings("unchecked") assignMapsWithLocality(List<Container> allocatedContainers)1205 private void assignMapsWithLocality(List<Container> allocatedContainers) { 1206 // try to assign to all nodes first to match node local 1207 Iterator<Container> it = allocatedContainers.iterator(); 1208 while(it.hasNext() && maps.size() > 0 && canAssignMaps()){ 1209 Container allocated = it.next(); 1210 Priority priority = allocated.getPriority(); 1211 assert PRIORITY_MAP.equals(priority); 1212 // "if (maps.containsKey(tId))" below should be almost always true. 1213 // hence this while loop would almost always have O(1) complexity 1214 String host = allocated.getNodeId().getHost(); 1215 LinkedList<TaskAttemptId> list = mapsHostMapping.get(host); 1216 while (list != null && list.size() > 0) { 1217 if (LOG.isDebugEnabled()) { 1218 LOG.debug("Host matched to the request list " + host); 1219 } 1220 TaskAttemptId tId = list.removeFirst(); 1221 if (maps.containsKey(tId)) { 1222 ContainerRequest assigned = maps.remove(tId); 1223 containerAssigned(allocated, assigned); 1224 it.remove(); 1225 JobCounterUpdateEvent jce = 1226 new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId()); 1227 jce.addCounterUpdate(JobCounter.DATA_LOCAL_MAPS, 1); 1228 eventHandler.handle(jce); 1229 hostLocalAssigned++; 1230 if (LOG.isDebugEnabled()) { 1231 LOG.debug("Assigned based on host match " + host); 1232 } 1233 break; 1234 } 1235 } 1236 } 1237 1238 // try to match all rack local 1239 it = allocatedContainers.iterator(); 1240 while(it.hasNext() && maps.size() > 0 && canAssignMaps()){ 1241 Container allocated = it.next(); 1242 Priority priority = allocated.getPriority(); 1243 assert PRIORITY_MAP.equals(priority); 1244 // "if (maps.containsKey(tId))" below should be almost always true. 1245 // hence this while loop would almost always have O(1) complexity 1246 String host = allocated.getNodeId().getHost(); 1247 String rack = RackResolver.resolve(host).getNetworkLocation(); 1248 LinkedList<TaskAttemptId> list = mapsRackMapping.get(rack); 1249 while (list != null && list.size() > 0) { 1250 TaskAttemptId tId = list.removeFirst(); 1251 if (maps.containsKey(tId)) { 1252 ContainerRequest assigned = maps.remove(tId); 1253 containerAssigned(allocated, assigned); 1254 it.remove(); 1255 JobCounterUpdateEvent jce = 1256 new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId()); 1257 jce.addCounterUpdate(JobCounter.RACK_LOCAL_MAPS, 1); 1258 eventHandler.handle(jce); 1259 rackLocalAssigned++; 1260 if (LOG.isDebugEnabled()) { 1261 LOG.debug("Assigned based on rack match " + rack); 1262 } 1263 break; 1264 } 1265 } 1266 } 1267 1268 // assign remaining 1269 it = allocatedContainers.iterator(); 1270 while(it.hasNext() && maps.size() > 0 && canAssignMaps()){ 1271 Container allocated = it.next(); 1272 Priority priority = allocated.getPriority(); 1273 assert PRIORITY_MAP.equals(priority); 1274 TaskAttemptId tId = maps.keySet().iterator().next(); 1275 ContainerRequest assigned = maps.remove(tId); 1276 containerAssigned(allocated, assigned); 1277 it.remove(); 1278 JobCounterUpdateEvent jce = 1279 new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId()); 1280 jce.addCounterUpdate(JobCounter.OTHER_LOCAL_MAPS, 1); 1281 eventHandler.handle(jce); 1282 if (LOG.isDebugEnabled()) { 1283 LOG.debug("Assigned based on * match"); 1284 } 1285 } 1286 } 1287 } 1288 1289 @Private 1290 @VisibleForTesting 1291 class AssignedRequests { 1292 private final Map<ContainerId, TaskAttemptId> containerToAttemptMap = 1293 new HashMap<ContainerId, TaskAttemptId>(); 1294 private final LinkedHashMap<TaskAttemptId, Container> maps = 1295 new LinkedHashMap<TaskAttemptId, Container>(); 1296 @VisibleForTesting 1297 final LinkedHashMap<TaskAttemptId, Container> reduces = 1298 new LinkedHashMap<TaskAttemptId, Container>(); 1299 @VisibleForTesting 1300 final Set<TaskAttemptId> preemptionWaitingReduces = 1301 new HashSet<TaskAttemptId>(); 1302 add(Container container, TaskAttemptId tId)1303 void add(Container container, TaskAttemptId tId) { 1304 LOG.info("Assigned container " + container.getId().toString() + " to " + tId); 1305 containerToAttemptMap.put(container.getId(), tId); 1306 if (tId.getTaskId().getTaskType().equals(TaskType.MAP)) { 1307 maps.put(tId, container); 1308 } else { 1309 reduces.put(tId, container); 1310 } 1311 } 1312 1313 @SuppressWarnings("unchecked") preemptReduce(int toPreempt)1314 void preemptReduce(int toPreempt) { 1315 List<TaskAttemptId> reduceList = new ArrayList<TaskAttemptId> 1316 (reduces.keySet()); 1317 //sort reduces on progress 1318 Collections.sort(reduceList, 1319 new Comparator<TaskAttemptId>() { 1320 @Override 1321 public int compare(TaskAttemptId o1, TaskAttemptId o2) { 1322 return Float.compare( 1323 getJob().getTask(o1.getTaskId()).getAttempt(o1).getProgress(), 1324 getJob().getTask(o2.getTaskId()).getAttempt(o2).getProgress()); 1325 } 1326 }); 1327 1328 for (int i = 0; i < toPreempt && reduceList.size() > 0; i++) { 1329 TaskAttemptId id = reduceList.remove(0);//remove the one on top 1330 LOG.info("Preempting " + id); 1331 preemptionWaitingReduces.add(id); 1332 eventHandler.handle(new TaskAttemptKillEvent(id, RAMPDOWN_DIAGNOSTIC)); 1333 } 1334 } 1335 remove(TaskAttemptId tId)1336 boolean remove(TaskAttemptId tId) { 1337 ContainerId containerId = null; 1338 if (tId.getTaskId().getTaskType().equals(TaskType.MAP)) { 1339 containerId = maps.remove(tId).getId(); 1340 } else { 1341 containerId = reduces.remove(tId).getId(); 1342 if (containerId != null) { 1343 boolean preempted = preemptionWaitingReduces.remove(tId); 1344 if (preempted) { 1345 LOG.info("Reduce preemption successful " + tId); 1346 } 1347 } 1348 } 1349 1350 if (containerId != null) { 1351 containerToAttemptMap.remove(containerId); 1352 return true; 1353 } 1354 return false; 1355 } 1356 get(ContainerId cId)1357 TaskAttemptId get(ContainerId cId) { 1358 return containerToAttemptMap.get(cId); 1359 } 1360 get(TaskAttemptId tId)1361 ContainerId get(TaskAttemptId tId) { 1362 Container taskContainer; 1363 if (tId.getTaskId().getTaskType().equals(TaskType.MAP)) { 1364 taskContainer = maps.get(tId); 1365 } else { 1366 taskContainer = reduces.get(tId); 1367 } 1368 1369 if (taskContainer == null) { 1370 return null; 1371 } else { 1372 return taskContainer.getId(); 1373 } 1374 } 1375 } 1376 1377 private class ScheduleStats { 1378 int numPendingReduces; 1379 int numScheduledMaps; 1380 int numScheduledReduces; 1381 int numAssignedMaps; 1382 int numAssignedReduces; 1383 int numCompletedMaps; 1384 int numCompletedReduces; 1385 int numContainersAllocated; 1386 int numContainersReleased; 1387 updateAndLogIfChanged(String msgPrefix)1388 public void updateAndLogIfChanged(String msgPrefix) { 1389 boolean changed = false; 1390 1391 // synchronized to fix findbug warnings 1392 synchronized (RMContainerAllocator.this) { 1393 changed |= (numPendingReduces != pendingReduces.size()); 1394 numPendingReduces = pendingReduces.size(); 1395 changed |= (numScheduledMaps != scheduledRequests.maps.size()); 1396 numScheduledMaps = scheduledRequests.maps.size(); 1397 changed |= (numScheduledReduces != scheduledRequests.reduces.size()); 1398 numScheduledReduces = scheduledRequests.reduces.size(); 1399 changed |= (numAssignedMaps != assignedRequests.maps.size()); 1400 numAssignedMaps = assignedRequests.maps.size(); 1401 changed |= (numAssignedReduces != assignedRequests.reduces.size()); 1402 numAssignedReduces = assignedRequests.reduces.size(); 1403 changed |= (numCompletedMaps != getJob().getCompletedMaps()); 1404 numCompletedMaps = getJob().getCompletedMaps(); 1405 changed |= (numCompletedReduces != getJob().getCompletedReduces()); 1406 numCompletedReduces = getJob().getCompletedReduces(); 1407 changed |= (numContainersAllocated != containersAllocated); 1408 numContainersAllocated = containersAllocated; 1409 changed |= (numContainersReleased != containersReleased); 1410 numContainersReleased = containersReleased; 1411 } 1412 1413 if (changed) { 1414 log(msgPrefix); 1415 } 1416 } 1417 log(String msgPrefix)1418 public void log(String msgPrefix) { 1419 LOG.info(msgPrefix + "PendingReds:" + numPendingReduces + 1420 " ScheduledMaps:" + numScheduledMaps + 1421 " ScheduledReds:" + numScheduledReduces + 1422 " AssignedMaps:" + numAssignedMaps + 1423 " AssignedReds:" + numAssignedReduces + 1424 " CompletedMaps:" + numCompletedMaps + 1425 " CompletedReds:" + numCompletedReduces + 1426 " ContAlloc:" + numContainersAllocated + 1427 " ContRel:" + numContainersReleased + 1428 " HostLocal:" + hostLocalAssigned + 1429 " RackLocal:" + rackLocalAssigned); 1430 } 1431 } 1432 } 1433