1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.mapreduce.v2.app.rm; 20 21 import java.io.IOException; 22 import java.util.ArrayList; 23 import java.util.Collections; 24 import java.util.HashMap; 25 import java.util.Iterator; 26 import java.util.Map; 27 import java.util.Set; 28 import java.util.TreeMap; 29 import java.util.TreeSet; 30 import java.util.concurrent.ConcurrentHashMap; 31 import java.util.concurrent.atomic.AtomicBoolean; 32 33 import org.apache.commons.logging.Log; 34 import org.apache.commons.logging.LogFactory; 35 import org.apache.hadoop.classification.InterfaceAudience.Private; 36 import org.apache.hadoop.conf.Configuration; 37 import org.apache.hadoop.mapreduce.MRJobConfig; 38 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; 39 import org.apache.hadoop.mapreduce.v2.app.AppContext; 40 import org.apache.hadoop.mapreduce.v2.app.client.ClientService; 41 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; 42 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; 43 import org.apache.hadoop.yarn.api.records.ContainerId; 44 import org.apache.hadoop.yarn.api.records.Priority; 45 import org.apache.hadoop.yarn.api.records.Resource; 46 import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; 47 import org.apache.hadoop.yarn.api.records.ResourceRequest; 48 import org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator; 49 import org.apache.hadoop.yarn.exceptions.YarnException; 50 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; 51 import org.apache.hadoop.yarn.factories.RecordFactory; 52 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; 53 54 import com.google.common.annotations.VisibleForTesting; 55 56 57 /** 58 * Keeps the data structures to send container requests to RM. 59 */ 60 public abstract class RMContainerRequestor extends RMCommunicator { 61 62 private static final Log LOG = LogFactory.getLog(RMContainerRequestor.class); 63 private static final ResourceRequestComparator RESOURCE_REQUEST_COMPARATOR = 64 new ResourceRequestComparator(); 65 66 protected int lastResponseID; 67 private Resource availableResources; 68 69 private final RecordFactory recordFactory = 70 RecordFactoryProvider.getRecordFactory(null); 71 //Key -> Priority 72 //Value -> Map 73 //Key->ResourceName (e.g., hostname, rackname, *) 74 //Value->Map 75 //Key->Resource Capability 76 //Value->ResourceRequest 77 private final Map<Priority, Map<String, Map<Resource, ResourceRequest>>> 78 remoteRequestsTable = 79 new TreeMap<Priority, Map<String, Map<Resource, ResourceRequest>>>(); 80 81 // use custom comparator to make sure ResourceRequest objects differing only in 82 // numContainers dont end up as duplicates 83 private final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>( 84 RESOURCE_REQUEST_COMPARATOR); 85 private final Set<ContainerId> release = new TreeSet<ContainerId>(); 86 // pendingRelease holds history or release requests.request is removed only if 87 // RM sends completedContainer. 88 // How it different from release? --> release is for per allocate() request. 89 protected Set<ContainerId> pendingRelease = new TreeSet<ContainerId>(); 90 91 private final Map<ResourceRequest,ResourceRequest> requestLimits = 92 new TreeMap<ResourceRequest,ResourceRequest>(RESOURCE_REQUEST_COMPARATOR); 93 private final Set<ResourceRequest> requestLimitsToUpdate = 94 new TreeSet<ResourceRequest>(RESOURCE_REQUEST_COMPARATOR); 95 96 private boolean nodeBlacklistingEnabled; 97 private int blacklistDisablePercent; 98 private AtomicBoolean ignoreBlacklisting = new AtomicBoolean(false); 99 private int blacklistedNodeCount = 0; 100 private int lastClusterNmCount = 0; 101 private int clusterNmCount = 0; 102 private int maxTaskFailuresPerNode; 103 private final Map<String, Integer> nodeFailures = new HashMap<String, Integer>(); 104 private final Set<String> blacklistedNodes = Collections 105 .newSetFromMap(new ConcurrentHashMap<String, Boolean>()); 106 private final Set<String> blacklistAdditions = Collections 107 .newSetFromMap(new ConcurrentHashMap<String, Boolean>()); 108 private final Set<String> blacklistRemovals = Collections 109 .newSetFromMap(new ConcurrentHashMap<String, Boolean>()); 110 RMContainerRequestor(ClientService clientService, AppContext context)111 public RMContainerRequestor(ClientService clientService, AppContext context) { 112 super(clientService, context); 113 } 114 115 @Private 116 @VisibleForTesting 117 static class ContainerRequest { 118 final TaskAttemptId attemptID; 119 final Resource capability; 120 final String[] hosts; 121 final String[] racks; 122 //final boolean earlierAttemptFailed; 123 final Priority priority; 124 /** 125 * the time when this request object was formed; can be used to avoid 126 * aggressive preemption for recently placed requests 127 */ 128 final long requestTimeMs; 129 ContainerRequest(ContainerRequestEvent event, Priority priority)130 public ContainerRequest(ContainerRequestEvent event, Priority priority) { 131 this(event.getAttemptID(), event.getCapability(), event.getHosts(), 132 event.getRacks(), priority); 133 } 134 ContainerRequest(ContainerRequestEvent event, Priority priority, long requestTimeMs)135 public ContainerRequest(ContainerRequestEvent event, Priority priority, 136 long requestTimeMs) { 137 this(event.getAttemptID(), event.getCapability(), event.getHosts(), 138 event.getRacks(), priority, requestTimeMs); 139 } 140 ContainerRequest(TaskAttemptId attemptID, Resource capability, String[] hosts, String[] racks, Priority priority)141 public ContainerRequest(TaskAttemptId attemptID, 142 Resource capability, String[] hosts, String[] racks, 143 Priority priority) { 144 this(attemptID, capability, hosts, racks, priority, 145 System.currentTimeMillis()); 146 } 147 ContainerRequest(TaskAttemptId attemptID, Resource capability, String[] hosts, String[] racks, Priority priority, long requestTimeMs)148 public ContainerRequest(TaskAttemptId attemptID, 149 Resource capability, String[] hosts, String[] racks, 150 Priority priority, long requestTimeMs) { 151 this.attemptID = attemptID; 152 this.capability = capability; 153 this.hosts = hosts; 154 this.racks = racks; 155 this.priority = priority; 156 this.requestTimeMs = requestTimeMs; 157 } 158 toString()159 public String toString() { 160 StringBuilder sb = new StringBuilder(); 161 sb.append("AttemptId[").append(attemptID).append("]"); 162 sb.append("Capability[").append(capability).append("]"); 163 sb.append("Priority[").append(priority).append("]"); 164 return sb.toString(); 165 } 166 } 167 168 @Override serviceInit(Configuration conf)169 protected void serviceInit(Configuration conf) throws Exception { 170 super.serviceInit(conf); 171 nodeBlacklistingEnabled = 172 conf.getBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true); 173 LOG.info("nodeBlacklistingEnabled:" + nodeBlacklistingEnabled); 174 maxTaskFailuresPerNode = 175 conf.getInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 3); 176 blacklistDisablePercent = 177 conf.getInt( 178 MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, 179 MRJobConfig.DEFAULT_MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERCENT); 180 LOG.info("maxTaskFailuresPerNode is " + maxTaskFailuresPerNode); 181 if (blacklistDisablePercent < -1 || blacklistDisablePercent > 100) { 182 throw new YarnRuntimeException("Invalid blacklistDisablePercent: " 183 + blacklistDisablePercent 184 + ". Should be an integer between 0 and 100 or -1 to disabled"); 185 } 186 LOG.info("blacklistDisablePercent is " + blacklistDisablePercent); 187 } 188 makeRemoteRequest()189 protected AllocateResponse makeRemoteRequest() throws YarnException, 190 IOException { 191 applyRequestLimits(); 192 ResourceBlacklistRequest blacklistRequest = 193 ResourceBlacklistRequest.newInstance(new ArrayList<String>(blacklistAdditions), 194 new ArrayList<String>(blacklistRemovals)); 195 AllocateRequest allocateRequest = 196 AllocateRequest.newInstance(lastResponseID, 197 super.getApplicationProgress(), new ArrayList<ResourceRequest>(ask), 198 new ArrayList<ContainerId>(release), blacklistRequest); 199 AllocateResponse allocateResponse = scheduler.allocate(allocateRequest); 200 lastResponseID = allocateResponse.getResponseId(); 201 availableResources = allocateResponse.getAvailableResources(); 202 lastClusterNmCount = clusterNmCount; 203 clusterNmCount = allocateResponse.getNumClusterNodes(); 204 int numCompletedContainers = 205 allocateResponse.getCompletedContainersStatuses().size(); 206 207 if (ask.size() > 0 || release.size() > 0) { 208 LOG.info("getResources() for " + applicationId + ":" + " ask=" 209 + ask.size() + " release= " + release.size() + " newContainers=" 210 + allocateResponse.getAllocatedContainers().size() 211 + " finishedContainers=" + numCompletedContainers 212 + " resourcelimit=" + availableResources + " knownNMs=" 213 + clusterNmCount); 214 } 215 216 ask.clear(); 217 release.clear(); 218 219 if (numCompletedContainers > 0) { 220 // re-send limited requests when a container completes to trigger asking 221 // for more containers 222 requestLimitsToUpdate.addAll(requestLimits.keySet()); 223 } 224 225 if (blacklistAdditions.size() > 0 || blacklistRemovals.size() > 0) { 226 LOG.info("Update the blacklist for " + applicationId + 227 ": blacklistAdditions=" + blacklistAdditions.size() + 228 " blacklistRemovals=" + blacklistRemovals.size()); 229 } 230 blacklistAdditions.clear(); 231 blacklistRemovals.clear(); 232 return allocateResponse; 233 } 234 applyRequestLimits()235 private void applyRequestLimits() { 236 Iterator<ResourceRequest> iter = requestLimits.values().iterator(); 237 while (iter.hasNext()) { 238 ResourceRequest reqLimit = iter.next(); 239 int limit = reqLimit.getNumContainers(); 240 Map<String, Map<Resource, ResourceRequest>> remoteRequests = 241 remoteRequestsTable.get(reqLimit.getPriority()); 242 Map<Resource, ResourceRequest> reqMap = (remoteRequests != null) 243 ? remoteRequests.get(ResourceRequest.ANY) : null; 244 ResourceRequest req = (reqMap != null) 245 ? reqMap.get(reqLimit.getCapability()) : null; 246 if (req == null) { 247 continue; 248 } 249 // update an existing ask or send a new one if updating 250 if (ask.remove(req) || requestLimitsToUpdate.contains(req)) { 251 ResourceRequest newReq = req.getNumContainers() > limit 252 ? reqLimit : req; 253 ask.add(newReq); 254 LOG.info("Applying ask limit of " + newReq.getNumContainers() 255 + " for priority:" + reqLimit.getPriority() 256 + " and capability:" + reqLimit.getCapability()); 257 } 258 if (limit == Integer.MAX_VALUE) { 259 iter.remove(); 260 } 261 } 262 requestLimitsToUpdate.clear(); 263 } 264 addOutstandingRequestOnResync()265 protected void addOutstandingRequestOnResync() { 266 for (Map<String, Map<Resource, ResourceRequest>> rr : remoteRequestsTable 267 .values()) { 268 for (Map<Resource, ResourceRequest> capabalities : rr.values()) { 269 for (ResourceRequest request : capabalities.values()) { 270 addResourceRequestToAsk(request); 271 } 272 } 273 } 274 if (!ignoreBlacklisting.get()) { 275 blacklistAdditions.addAll(blacklistedNodes); 276 } 277 if (!pendingRelease.isEmpty()) { 278 release.addAll(pendingRelease); 279 } 280 requestLimitsToUpdate.addAll(requestLimits.keySet()); 281 } 282 283 // May be incorrect if there's multiple NodeManagers running on a single host. 284 // knownNodeCount is based on node managers, not hosts. blacklisting is 285 // currently based on hosts. computeIgnoreBlacklisting()286 protected void computeIgnoreBlacklisting() { 287 if (!nodeBlacklistingEnabled) { 288 return; 289 } 290 if (blacklistDisablePercent != -1 291 && (blacklistedNodeCount != blacklistedNodes.size() || 292 clusterNmCount != lastClusterNmCount)) { 293 blacklistedNodeCount = blacklistedNodes.size(); 294 if (clusterNmCount == 0) { 295 LOG.info("KnownNode Count at 0. Not computing ignoreBlacklisting"); 296 return; 297 } 298 int val = (int) ((float) blacklistedNodes.size() / clusterNmCount * 100); 299 if (val >= blacklistDisablePercent) { 300 if (ignoreBlacklisting.compareAndSet(false, true)) { 301 LOG.info("Ignore blacklisting set to true. Known: " + clusterNmCount 302 + ", Blacklisted: " + blacklistedNodeCount + ", " + val + "%"); 303 // notify RM to ignore all the blacklisted nodes 304 blacklistAdditions.clear(); 305 blacklistRemovals.addAll(blacklistedNodes); 306 } 307 } else { 308 if (ignoreBlacklisting.compareAndSet(true, false)) { 309 LOG.info("Ignore blacklisting set to false. Known: " + clusterNmCount 310 + ", Blacklisted: " + blacklistedNodeCount + ", " + val + "%"); 311 // notify RM of all the blacklisted nodes 312 blacklistAdditions.addAll(blacklistedNodes); 313 blacklistRemovals.clear(); 314 } 315 } 316 } 317 } 318 containerFailedOnHost(String hostName)319 protected void containerFailedOnHost(String hostName) { 320 if (!nodeBlacklistingEnabled) { 321 return; 322 } 323 if (blacklistedNodes.contains(hostName)) { 324 if (LOG.isDebugEnabled()) { 325 LOG.debug("Host " + hostName + " is already blacklisted."); 326 } 327 return; //already blacklisted 328 } 329 Integer failures = nodeFailures.remove(hostName); 330 failures = failures == null ? Integer.valueOf(0) : failures; 331 failures++; 332 LOG.info(failures + " failures on node " + hostName); 333 if (failures >= maxTaskFailuresPerNode) { 334 blacklistedNodes.add(hostName); 335 if (!ignoreBlacklisting.get()) { 336 blacklistAdditions.add(hostName); 337 } 338 //Even if blacklisting is ignored, continue to remove the host from 339 // the request table. The RM may have additional nodes it can allocate on. 340 LOG.info("Blacklisted host " + hostName); 341 342 //remove all the requests corresponding to this hostname 343 for (Map<String, Map<Resource, ResourceRequest>> remoteRequests 344 : remoteRequestsTable.values()){ 345 //remove from host if no pending allocations 346 boolean foundAll = true; 347 Map<Resource, ResourceRequest> reqMap = remoteRequests.get(hostName); 348 if (reqMap != null) { 349 for (ResourceRequest req : reqMap.values()) { 350 if (!ask.remove(req)) { 351 foundAll = false; 352 // if ask already sent to RM, we can try and overwrite it if possible. 353 // send a new ask to RM with numContainers 354 // specified for the blacklisted host to be 0. 355 ResourceRequest zeroedRequest = 356 ResourceRequest.newInstance(req.getPriority(), 357 req.getResourceName(), req.getCapability(), 358 req.getNumContainers(), req.getRelaxLocality()); 359 360 zeroedRequest.setNumContainers(0); 361 // to be sent to RM on next heartbeat 362 addResourceRequestToAsk(zeroedRequest); 363 } 364 } 365 // if all requests were still in ask queue 366 // we can remove this request 367 if (foundAll) { 368 remoteRequests.remove(hostName); 369 } 370 } 371 // TODO handling of rack blacklisting 372 // Removing from rack should be dependent on no. of failures within the rack 373 // Blacklisting a rack on the basis of a single node's blacklisting 374 // may be overly aggressive. 375 // Node failures could be co-related with other failures on the same rack 376 // but we probably need a better approach at trying to decide how and when 377 // to blacklist a rack 378 } 379 } else { 380 nodeFailures.put(hostName, failures); 381 } 382 } 383 getAvailableResources()384 protected Resource getAvailableResources() { 385 return availableResources; 386 } 387 addContainerReq(ContainerRequest req)388 protected void addContainerReq(ContainerRequest req) { 389 // Create resource requests 390 for (String host : req.hosts) { 391 // Data-local 392 if (!isNodeBlacklisted(host)) { 393 addResourceRequest(req.priority, host, req.capability); 394 } 395 } 396 397 // Nothing Rack-local for now 398 for (String rack : req.racks) { 399 addResourceRequest(req.priority, rack, req.capability); 400 } 401 402 // Off-switch 403 addResourceRequest(req.priority, ResourceRequest.ANY, req.capability); 404 } 405 decContainerReq(ContainerRequest req)406 protected void decContainerReq(ContainerRequest req) { 407 // Update resource requests 408 for (String hostName : req.hosts) { 409 decResourceRequest(req.priority, hostName, req.capability); 410 } 411 412 for (String rack : req.racks) { 413 decResourceRequest(req.priority, rack, req.capability); 414 } 415 416 decResourceRequest(req.priority, ResourceRequest.ANY, req.capability); 417 } 418 addResourceRequest(Priority priority, String resourceName, Resource capability)419 private void addResourceRequest(Priority priority, String resourceName, 420 Resource capability) { 421 Map<String, Map<Resource, ResourceRequest>> remoteRequests = 422 this.remoteRequestsTable.get(priority); 423 if (remoteRequests == null) { 424 remoteRequests = new HashMap<String, Map<Resource, ResourceRequest>>(); 425 this.remoteRequestsTable.put(priority, remoteRequests); 426 if (LOG.isDebugEnabled()) { 427 LOG.debug("Added priority=" + priority); 428 } 429 } 430 Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName); 431 if (reqMap == null) { 432 reqMap = new HashMap<Resource, ResourceRequest>(); 433 remoteRequests.put(resourceName, reqMap); 434 } 435 ResourceRequest remoteRequest = reqMap.get(capability); 436 if (remoteRequest == null) { 437 remoteRequest = recordFactory.newRecordInstance(ResourceRequest.class); 438 remoteRequest.setPriority(priority); 439 remoteRequest.setResourceName(resourceName); 440 remoteRequest.setCapability(capability); 441 remoteRequest.setNumContainers(0); 442 reqMap.put(capability, remoteRequest); 443 } 444 remoteRequest.setNumContainers(remoteRequest.getNumContainers() + 1); 445 446 // Note this down for next interaction with ResourceManager 447 addResourceRequestToAsk(remoteRequest); 448 if (LOG.isDebugEnabled()) { 449 LOG.debug("addResourceRequest:" + " applicationId=" 450 + applicationId.getId() + " priority=" + priority.getPriority() 451 + " resourceName=" + resourceName + " numContainers=" 452 + remoteRequest.getNumContainers() + " #asks=" + ask.size()); 453 } 454 } 455 decResourceRequest(Priority priority, String resourceName, Resource capability)456 private void decResourceRequest(Priority priority, String resourceName, 457 Resource capability) { 458 Map<String, Map<Resource, ResourceRequest>> remoteRequests = 459 this.remoteRequestsTable.get(priority); 460 Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName); 461 if (reqMap == null) { 462 // as we modify the resource requests by filtering out blacklisted hosts 463 // when they are added, this value may be null when being 464 // decremented 465 if (LOG.isDebugEnabled()) { 466 LOG.debug("Not decrementing resource as " + resourceName 467 + " is not present in request table"); 468 } 469 return; 470 } 471 ResourceRequest remoteRequest = reqMap.get(capability); 472 473 if (LOG.isDebugEnabled()) { 474 LOG.debug("BEFORE decResourceRequest:" + " applicationId=" 475 + applicationId.getId() + " priority=" + priority.getPriority() 476 + " resourceName=" + resourceName + " numContainers=" 477 + remoteRequest.getNumContainers() + " #asks=" + ask.size()); 478 } 479 480 if(remoteRequest.getNumContainers() > 0) { 481 // based on blacklisting comments above we can end up decrementing more 482 // than requested. so guard for that. 483 remoteRequest.setNumContainers(remoteRequest.getNumContainers() -1); 484 } 485 486 if (remoteRequest.getNumContainers() == 0) { 487 reqMap.remove(capability); 488 if (reqMap.size() == 0) { 489 remoteRequests.remove(resourceName); 490 } 491 if (remoteRequests.size() == 0) { 492 remoteRequestsTable.remove(priority); 493 } 494 } 495 496 // send the updated resource request to RM 497 // send 0 container count requests also to cancel previous requests 498 addResourceRequestToAsk(remoteRequest); 499 500 if (LOG.isDebugEnabled()) { 501 LOG.info("AFTER decResourceRequest:" + " applicationId=" 502 + applicationId.getId() + " priority=" + priority.getPriority() 503 + " resourceName=" + resourceName + " numContainers=" 504 + remoteRequest.getNumContainers() + " #asks=" + ask.size()); 505 } 506 } 507 addResourceRequestToAsk(ResourceRequest remoteRequest)508 private void addResourceRequestToAsk(ResourceRequest remoteRequest) { 509 // because objects inside the resource map can be deleted ask can end up 510 // containing an object that matches new resource object but with different 511 // numContainers. So existing values must be replaced explicitly 512 ask.remove(remoteRequest); 513 ask.add(remoteRequest); 514 } 515 release(ContainerId containerId)516 protected void release(ContainerId containerId) { 517 release.add(containerId); 518 } 519 isNodeBlacklisted(String hostname)520 protected boolean isNodeBlacklisted(String hostname) { 521 if (!nodeBlacklistingEnabled || ignoreBlacklisting.get()) { 522 return false; 523 } 524 return blacklistedNodes.contains(hostname); 525 } 526 getFilteredContainerRequest(ContainerRequest orig)527 protected ContainerRequest getFilteredContainerRequest(ContainerRequest orig) { 528 ArrayList<String> newHosts = new ArrayList<String>(); 529 for (String host : orig.hosts) { 530 if (!isNodeBlacklisted(host)) { 531 newHosts.add(host); 532 } 533 } 534 String[] hosts = newHosts.toArray(new String[newHosts.size()]); 535 ContainerRequest newReq = new ContainerRequest(orig.attemptID, orig.capability, 536 hosts, orig.racks, orig.priority); 537 return newReq; 538 } 539 setRequestLimit(Priority priority, Resource capability, int limit)540 protected void setRequestLimit(Priority priority, Resource capability, 541 int limit) { 542 if (limit < 0) { 543 limit = Integer.MAX_VALUE; 544 } 545 ResourceRequest newReqLimit = ResourceRequest.newInstance(priority, 546 ResourceRequest.ANY, capability, limit); 547 ResourceRequest oldReqLimit = requestLimits.put(newReqLimit, newReqLimit); 548 if (oldReqLimit == null || oldReqLimit.getNumContainers() < limit) { 549 requestLimitsToUpdate.add(newReqLimit); 550 } 551 } 552 getBlacklistedNodes()553 public Set<String> getBlacklistedNodes() { 554 return blacklistedNodes; 555 } 556 } 557