1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 
19 package org.apache.hadoop.mapreduce.v2.app.rm;
20 
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Collections;
24 import java.util.HashMap;
25 import java.util.Iterator;
26 import java.util.Map;
27 import java.util.Set;
28 import java.util.TreeMap;
29 import java.util.TreeSet;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.atomic.AtomicBoolean;
32 
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.hadoop.classification.InterfaceAudience.Private;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.mapreduce.MRJobConfig;
38 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
39 import org.apache.hadoop.mapreduce.v2.app.AppContext;
40 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
41 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
42 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
43 import org.apache.hadoop.yarn.api.records.ContainerId;
44 import org.apache.hadoop.yarn.api.records.Priority;
45 import org.apache.hadoop.yarn.api.records.Resource;
46 import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
47 import org.apache.hadoop.yarn.api.records.ResourceRequest;
48 import org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator;
49 import org.apache.hadoop.yarn.exceptions.YarnException;
50 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
51 import org.apache.hadoop.yarn.factories.RecordFactory;
52 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
53 
54 import com.google.common.annotations.VisibleForTesting;
55 
56 
57 /**
58  * Keeps the data structures to send container requests to RM.
59  */
60 public abstract class RMContainerRequestor extends RMCommunicator {
61 
62   private static final Log LOG = LogFactory.getLog(RMContainerRequestor.class);
63   private static final ResourceRequestComparator RESOURCE_REQUEST_COMPARATOR =
64       new ResourceRequestComparator();
65 
66   protected int lastResponseID;
67   private Resource availableResources;
68 
69   private final RecordFactory recordFactory =
70       RecordFactoryProvider.getRecordFactory(null);
71   //Key -> Priority
72   //Value -> Map
73   //Key->ResourceName (e.g., hostname, rackname, *)
74   //Value->Map
75   //Key->Resource Capability
76   //Value->ResourceRequest
77   private final Map<Priority, Map<String, Map<Resource, ResourceRequest>>>
78   remoteRequestsTable =
79       new TreeMap<Priority, Map<String, Map<Resource, ResourceRequest>>>();
80 
81   // use custom comparator to make sure ResourceRequest objects differing only in
82   // numContainers dont end up as duplicates
83   private final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>(
84       RESOURCE_REQUEST_COMPARATOR);
85   private final Set<ContainerId> release = new TreeSet<ContainerId>();
86   // pendingRelease holds history or release requests.request is removed only if
87   // RM sends completedContainer.
88   // How it different from release? --> release is for per allocate() request.
89   protected Set<ContainerId> pendingRelease = new TreeSet<ContainerId>();
90 
91   private final Map<ResourceRequest,ResourceRequest> requestLimits =
92       new TreeMap<ResourceRequest,ResourceRequest>(RESOURCE_REQUEST_COMPARATOR);
93   private final Set<ResourceRequest> requestLimitsToUpdate =
94       new TreeSet<ResourceRequest>(RESOURCE_REQUEST_COMPARATOR);
95 
96   private boolean nodeBlacklistingEnabled;
97   private int blacklistDisablePercent;
98   private AtomicBoolean ignoreBlacklisting = new AtomicBoolean(false);
99   private int blacklistedNodeCount = 0;
100   private int lastClusterNmCount = 0;
101   private int clusterNmCount = 0;
102   private int maxTaskFailuresPerNode;
103   private final Map<String, Integer> nodeFailures = new HashMap<String, Integer>();
104   private final Set<String> blacklistedNodes = Collections
105       .newSetFromMap(new ConcurrentHashMap<String, Boolean>());
106   private final Set<String> blacklistAdditions = Collections
107       .newSetFromMap(new ConcurrentHashMap<String, Boolean>());
108   private final Set<String> blacklistRemovals = Collections
109       .newSetFromMap(new ConcurrentHashMap<String, Boolean>());
110 
RMContainerRequestor(ClientService clientService, AppContext context)111   public RMContainerRequestor(ClientService clientService, AppContext context) {
112     super(clientService, context);
113   }
114 
115   @Private
116   @VisibleForTesting
117   static class ContainerRequest {
118     final TaskAttemptId attemptID;
119     final Resource capability;
120     final String[] hosts;
121     final String[] racks;
122     //final boolean earlierAttemptFailed;
123     final Priority priority;
124     /**
125      * the time when this request object was formed; can be used to avoid
126      * aggressive preemption for recently placed requests
127      */
128     final long requestTimeMs;
129 
ContainerRequest(ContainerRequestEvent event, Priority priority)130     public ContainerRequest(ContainerRequestEvent event, Priority priority) {
131       this(event.getAttemptID(), event.getCapability(), event.getHosts(),
132           event.getRacks(), priority);
133     }
134 
ContainerRequest(ContainerRequestEvent event, Priority priority, long requestTimeMs)135     public ContainerRequest(ContainerRequestEvent event, Priority priority,
136                             long requestTimeMs) {
137       this(event.getAttemptID(), event.getCapability(), event.getHosts(),
138           event.getRacks(), priority, requestTimeMs);
139     }
140 
ContainerRequest(TaskAttemptId attemptID, Resource capability, String[] hosts, String[] racks, Priority priority)141     public ContainerRequest(TaskAttemptId attemptID,
142                             Resource capability, String[] hosts, String[] racks,
143                             Priority priority) {
144       this(attemptID, capability, hosts, racks, priority,
145           System.currentTimeMillis());
146     }
147 
ContainerRequest(TaskAttemptId attemptID, Resource capability, String[] hosts, String[] racks, Priority priority, long requestTimeMs)148     public ContainerRequest(TaskAttemptId attemptID,
149         Resource capability, String[] hosts, String[] racks,
150         Priority priority, long requestTimeMs) {
151       this.attemptID = attemptID;
152       this.capability = capability;
153       this.hosts = hosts;
154       this.racks = racks;
155       this.priority = priority;
156       this.requestTimeMs = requestTimeMs;
157     }
158 
toString()159     public String toString() {
160       StringBuilder sb = new StringBuilder();
161       sb.append("AttemptId[").append(attemptID).append("]");
162       sb.append("Capability[").append(capability).append("]");
163       sb.append("Priority[").append(priority).append("]");
164       return sb.toString();
165     }
166   }
167 
168   @Override
serviceInit(Configuration conf)169   protected void serviceInit(Configuration conf) throws Exception {
170     super.serviceInit(conf);
171     nodeBlacklistingEnabled =
172       conf.getBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true);
173     LOG.info("nodeBlacklistingEnabled:" + nodeBlacklistingEnabled);
174     maxTaskFailuresPerNode =
175       conf.getInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 3);
176     blacklistDisablePercent =
177         conf.getInt(
178             MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT,
179             MRJobConfig.DEFAULT_MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERCENT);
180     LOG.info("maxTaskFailuresPerNode is " + maxTaskFailuresPerNode);
181     if (blacklistDisablePercent < -1 || blacklistDisablePercent > 100) {
182       throw new YarnRuntimeException("Invalid blacklistDisablePercent: "
183           + blacklistDisablePercent
184           + ". Should be an integer between 0 and 100 or -1 to disabled");
185     }
186     LOG.info("blacklistDisablePercent is " + blacklistDisablePercent);
187   }
188 
makeRemoteRequest()189   protected AllocateResponse makeRemoteRequest() throws YarnException,
190       IOException {
191     applyRequestLimits();
192     ResourceBlacklistRequest blacklistRequest =
193         ResourceBlacklistRequest.newInstance(new ArrayList<String>(blacklistAdditions),
194             new ArrayList<String>(blacklistRemovals));
195     AllocateRequest allocateRequest =
196         AllocateRequest.newInstance(lastResponseID,
197           super.getApplicationProgress(), new ArrayList<ResourceRequest>(ask),
198           new ArrayList<ContainerId>(release), blacklistRequest);
199     AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
200     lastResponseID = allocateResponse.getResponseId();
201     availableResources = allocateResponse.getAvailableResources();
202     lastClusterNmCount = clusterNmCount;
203     clusterNmCount = allocateResponse.getNumClusterNodes();
204     int numCompletedContainers =
205         allocateResponse.getCompletedContainersStatuses().size();
206 
207     if (ask.size() > 0 || release.size() > 0) {
208       LOG.info("getResources() for " + applicationId + ":" + " ask="
209           + ask.size() + " release= " + release.size() + " newContainers="
210           + allocateResponse.getAllocatedContainers().size()
211           + " finishedContainers=" + numCompletedContainers
212           + " resourcelimit=" + availableResources + " knownNMs="
213           + clusterNmCount);
214     }
215 
216     ask.clear();
217     release.clear();
218 
219     if (numCompletedContainers > 0) {
220       // re-send limited requests when a container completes to trigger asking
221       // for more containers
222       requestLimitsToUpdate.addAll(requestLimits.keySet());
223     }
224 
225     if (blacklistAdditions.size() > 0 || blacklistRemovals.size() > 0) {
226       LOG.info("Update the blacklist for " + applicationId +
227           ": blacklistAdditions=" + blacklistAdditions.size() +
228           " blacklistRemovals=" +  blacklistRemovals.size());
229     }
230     blacklistAdditions.clear();
231     blacklistRemovals.clear();
232     return allocateResponse;
233   }
234 
applyRequestLimits()235   private void applyRequestLimits() {
236     Iterator<ResourceRequest> iter = requestLimits.values().iterator();
237     while (iter.hasNext()) {
238       ResourceRequest reqLimit = iter.next();
239       int limit = reqLimit.getNumContainers();
240       Map<String, Map<Resource, ResourceRequest>> remoteRequests =
241           remoteRequestsTable.get(reqLimit.getPriority());
242       Map<Resource, ResourceRequest> reqMap = (remoteRequests != null)
243           ? remoteRequests.get(ResourceRequest.ANY) : null;
244       ResourceRequest req = (reqMap != null)
245           ? reqMap.get(reqLimit.getCapability()) : null;
246       if (req == null) {
247         continue;
248       }
249       // update an existing ask or send a new one if updating
250       if (ask.remove(req) || requestLimitsToUpdate.contains(req)) {
251         ResourceRequest newReq = req.getNumContainers() > limit
252             ? reqLimit : req;
253         ask.add(newReq);
254         LOG.info("Applying ask limit of " + newReq.getNumContainers()
255             + " for priority:" + reqLimit.getPriority()
256             + " and capability:" + reqLimit.getCapability());
257       }
258       if (limit == Integer.MAX_VALUE) {
259         iter.remove();
260       }
261     }
262     requestLimitsToUpdate.clear();
263   }
264 
addOutstandingRequestOnResync()265   protected void addOutstandingRequestOnResync() {
266     for (Map<String, Map<Resource, ResourceRequest>> rr : remoteRequestsTable
267         .values()) {
268       for (Map<Resource, ResourceRequest> capabalities : rr.values()) {
269         for (ResourceRequest request : capabalities.values()) {
270           addResourceRequestToAsk(request);
271         }
272       }
273     }
274     if (!ignoreBlacklisting.get()) {
275       blacklistAdditions.addAll(blacklistedNodes);
276     }
277     if (!pendingRelease.isEmpty()) {
278       release.addAll(pendingRelease);
279     }
280     requestLimitsToUpdate.addAll(requestLimits.keySet());
281   }
282 
283   // May be incorrect if there's multiple NodeManagers running on a single host.
284   // knownNodeCount is based on node managers, not hosts. blacklisting is
285   // currently based on hosts.
computeIgnoreBlacklisting()286   protected void computeIgnoreBlacklisting() {
287     if (!nodeBlacklistingEnabled) {
288       return;
289     }
290     if (blacklistDisablePercent != -1
291         && (blacklistedNodeCount != blacklistedNodes.size() ||
292             clusterNmCount != lastClusterNmCount)) {
293       blacklistedNodeCount = blacklistedNodes.size();
294       if (clusterNmCount == 0) {
295         LOG.info("KnownNode Count at 0. Not computing ignoreBlacklisting");
296         return;
297       }
298       int val = (int) ((float) blacklistedNodes.size() / clusterNmCount * 100);
299       if (val >= blacklistDisablePercent) {
300         if (ignoreBlacklisting.compareAndSet(false, true)) {
301           LOG.info("Ignore blacklisting set to true. Known: " + clusterNmCount
302               + ", Blacklisted: " + blacklistedNodeCount + ", " + val + "%");
303           // notify RM to ignore all the blacklisted nodes
304           blacklistAdditions.clear();
305           blacklistRemovals.addAll(blacklistedNodes);
306         }
307       } else {
308         if (ignoreBlacklisting.compareAndSet(true, false)) {
309           LOG.info("Ignore blacklisting set to false. Known: " + clusterNmCount
310               + ", Blacklisted: " + blacklistedNodeCount + ", " + val + "%");
311           // notify RM of all the blacklisted nodes
312           blacklistAdditions.addAll(blacklistedNodes);
313           blacklistRemovals.clear();
314         }
315       }
316     }
317   }
318 
containerFailedOnHost(String hostName)319   protected void containerFailedOnHost(String hostName) {
320     if (!nodeBlacklistingEnabled) {
321       return;
322     }
323     if (blacklistedNodes.contains(hostName)) {
324       if (LOG.isDebugEnabled()) {
325         LOG.debug("Host " + hostName + " is already blacklisted.");
326       }
327       return; //already blacklisted
328     }
329     Integer failures = nodeFailures.remove(hostName);
330     failures = failures == null ? Integer.valueOf(0) : failures;
331     failures++;
332     LOG.info(failures + " failures on node " + hostName);
333     if (failures >= maxTaskFailuresPerNode) {
334       blacklistedNodes.add(hostName);
335       if (!ignoreBlacklisting.get()) {
336         blacklistAdditions.add(hostName);
337       }
338       //Even if blacklisting is ignored, continue to remove the host from
339       // the request table. The RM may have additional nodes it can allocate on.
340       LOG.info("Blacklisted host " + hostName);
341 
342       //remove all the requests corresponding to this hostname
343       for (Map<String, Map<Resource, ResourceRequest>> remoteRequests
344           : remoteRequestsTable.values()){
345         //remove from host if no pending allocations
346         boolean foundAll = true;
347         Map<Resource, ResourceRequest> reqMap = remoteRequests.get(hostName);
348         if (reqMap != null) {
349           for (ResourceRequest req : reqMap.values()) {
350             if (!ask.remove(req)) {
351               foundAll = false;
352               // if ask already sent to RM, we can try and overwrite it if possible.
353               // send a new ask to RM with numContainers
354               // specified for the blacklisted host to be 0.
355               ResourceRequest zeroedRequest =
356                   ResourceRequest.newInstance(req.getPriority(),
357                     req.getResourceName(), req.getCapability(),
358                     req.getNumContainers(), req.getRelaxLocality());
359 
360               zeroedRequest.setNumContainers(0);
361               // to be sent to RM on next heartbeat
362               addResourceRequestToAsk(zeroedRequest);
363             }
364           }
365           // if all requests were still in ask queue
366           // we can remove this request
367           if (foundAll) {
368             remoteRequests.remove(hostName);
369           }
370         }
371         // TODO handling of rack blacklisting
372         // Removing from rack should be dependent on no. of failures within the rack
373         // Blacklisting a rack on the basis of a single node's blacklisting
374         // may be overly aggressive.
375         // Node failures could be co-related with other failures on the same rack
376         // but we probably need a better approach at trying to decide how and when
377         // to blacklist a rack
378       }
379     } else {
380       nodeFailures.put(hostName, failures);
381     }
382   }
383 
getAvailableResources()384   protected Resource getAvailableResources() {
385     return availableResources;
386   }
387 
addContainerReq(ContainerRequest req)388   protected void addContainerReq(ContainerRequest req) {
389     // Create resource requests
390     for (String host : req.hosts) {
391       // Data-local
392       if (!isNodeBlacklisted(host)) {
393         addResourceRequest(req.priority, host, req.capability);
394       }
395     }
396 
397     // Nothing Rack-local for now
398     for (String rack : req.racks) {
399       addResourceRequest(req.priority, rack, req.capability);
400     }
401 
402     // Off-switch
403     addResourceRequest(req.priority, ResourceRequest.ANY, req.capability);
404   }
405 
decContainerReq(ContainerRequest req)406   protected void decContainerReq(ContainerRequest req) {
407     // Update resource requests
408     for (String hostName : req.hosts) {
409       decResourceRequest(req.priority, hostName, req.capability);
410     }
411 
412     for (String rack : req.racks) {
413       decResourceRequest(req.priority, rack, req.capability);
414     }
415 
416     decResourceRequest(req.priority, ResourceRequest.ANY, req.capability);
417   }
418 
addResourceRequest(Priority priority, String resourceName, Resource capability)419   private void addResourceRequest(Priority priority, String resourceName,
420       Resource capability) {
421     Map<String, Map<Resource, ResourceRequest>> remoteRequests =
422       this.remoteRequestsTable.get(priority);
423     if (remoteRequests == null) {
424       remoteRequests = new HashMap<String, Map<Resource, ResourceRequest>>();
425       this.remoteRequestsTable.put(priority, remoteRequests);
426       if (LOG.isDebugEnabled()) {
427         LOG.debug("Added priority=" + priority);
428       }
429     }
430     Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
431     if (reqMap == null) {
432       reqMap = new HashMap<Resource, ResourceRequest>();
433       remoteRequests.put(resourceName, reqMap);
434     }
435     ResourceRequest remoteRequest = reqMap.get(capability);
436     if (remoteRequest == null) {
437       remoteRequest = recordFactory.newRecordInstance(ResourceRequest.class);
438       remoteRequest.setPriority(priority);
439       remoteRequest.setResourceName(resourceName);
440       remoteRequest.setCapability(capability);
441       remoteRequest.setNumContainers(0);
442       reqMap.put(capability, remoteRequest);
443     }
444     remoteRequest.setNumContainers(remoteRequest.getNumContainers() + 1);
445 
446     // Note this down for next interaction with ResourceManager
447     addResourceRequestToAsk(remoteRequest);
448     if (LOG.isDebugEnabled()) {
449       LOG.debug("addResourceRequest:" + " applicationId="
450           + applicationId.getId() + " priority=" + priority.getPriority()
451           + " resourceName=" + resourceName + " numContainers="
452           + remoteRequest.getNumContainers() + " #asks=" + ask.size());
453     }
454   }
455 
decResourceRequest(Priority priority, String resourceName, Resource capability)456   private void decResourceRequest(Priority priority, String resourceName,
457       Resource capability) {
458     Map<String, Map<Resource, ResourceRequest>> remoteRequests =
459       this.remoteRequestsTable.get(priority);
460     Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
461     if (reqMap == null) {
462       // as we modify the resource requests by filtering out blacklisted hosts
463       // when they are added, this value may be null when being
464       // decremented
465       if (LOG.isDebugEnabled()) {
466         LOG.debug("Not decrementing resource as " + resourceName
467             + " is not present in request table");
468       }
469       return;
470     }
471     ResourceRequest remoteRequest = reqMap.get(capability);
472 
473     if (LOG.isDebugEnabled()) {
474       LOG.debug("BEFORE decResourceRequest:" + " applicationId="
475           + applicationId.getId() + " priority=" + priority.getPriority()
476           + " resourceName=" + resourceName + " numContainers="
477           + remoteRequest.getNumContainers() + " #asks=" + ask.size());
478     }
479 
480     if(remoteRequest.getNumContainers() > 0) {
481       // based on blacklisting comments above we can end up decrementing more
482       // than requested. so guard for that.
483       remoteRequest.setNumContainers(remoteRequest.getNumContainers() -1);
484     }
485 
486     if (remoteRequest.getNumContainers() == 0) {
487       reqMap.remove(capability);
488       if (reqMap.size() == 0) {
489         remoteRequests.remove(resourceName);
490       }
491       if (remoteRequests.size() == 0) {
492         remoteRequestsTable.remove(priority);
493       }
494     }
495 
496     // send the updated resource request to RM
497     // send 0 container count requests also to cancel previous requests
498     addResourceRequestToAsk(remoteRequest);
499 
500     if (LOG.isDebugEnabled()) {
501       LOG.info("AFTER decResourceRequest:" + " applicationId="
502           + applicationId.getId() + " priority=" + priority.getPriority()
503           + " resourceName=" + resourceName + " numContainers="
504           + remoteRequest.getNumContainers() + " #asks=" + ask.size());
505     }
506   }
507 
addResourceRequestToAsk(ResourceRequest remoteRequest)508   private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
509     // because objects inside the resource map can be deleted ask can end up
510     // containing an object that matches new resource object but with different
511     // numContainers. So existing values must be replaced explicitly
512     ask.remove(remoteRequest);
513     ask.add(remoteRequest);
514   }
515 
release(ContainerId containerId)516   protected void release(ContainerId containerId) {
517     release.add(containerId);
518   }
519 
isNodeBlacklisted(String hostname)520   protected boolean isNodeBlacklisted(String hostname) {
521     if (!nodeBlacklistingEnabled || ignoreBlacklisting.get()) {
522       return false;
523     }
524     return blacklistedNodes.contains(hostname);
525   }
526 
getFilteredContainerRequest(ContainerRequest orig)527   protected ContainerRequest getFilteredContainerRequest(ContainerRequest orig) {
528     ArrayList<String> newHosts = new ArrayList<String>();
529     for (String host : orig.hosts) {
530       if (!isNodeBlacklisted(host)) {
531         newHosts.add(host);
532       }
533     }
534     String[] hosts = newHosts.toArray(new String[newHosts.size()]);
535     ContainerRequest newReq = new ContainerRequest(orig.attemptID, orig.capability,
536         hosts, orig.racks, orig.priority);
537     return newReq;
538   }
539 
setRequestLimit(Priority priority, Resource capability, int limit)540   protected void setRequestLimit(Priority priority, Resource capability,
541       int limit) {
542     if (limit < 0) {
543       limit = Integer.MAX_VALUE;
544     }
545     ResourceRequest newReqLimit = ResourceRequest.newInstance(priority,
546         ResourceRequest.ANY, capability, limit);
547     ResourceRequest oldReqLimit = requestLimits.put(newReqLimit, newReqLimit);
548     if (oldReqLimit == null || oldReqLimit.getNumContainers() < limit) {
549       requestLimitsToUpdate.add(newReqLimit);
550     }
551   }
552 
getBlacklistedNodes()553   public Set<String> getBlacklistedNodes() {
554     return blacklistedNodes;
555   }
556 }
557