1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 
19 package org.apache.hadoop.yarn.client.api;
20 
21 import java.io.IOException;
22 import java.util.Collection;
23 import java.util.List;
24 
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.classification.InterfaceAudience;
28 import org.apache.hadoop.classification.InterfaceAudience.Private;
29 import org.apache.hadoop.classification.InterfaceAudience.Public;
30 import org.apache.hadoop.classification.InterfaceStability;
31 import org.apache.hadoop.service.AbstractService;
32 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
33 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
34 import org.apache.hadoop.yarn.api.records.ContainerId;
35 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
36 import org.apache.hadoop.yarn.api.records.Priority;
37 import org.apache.hadoop.yarn.api.records.Resource;
38 import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
39 import org.apache.hadoop.yarn.exceptions.YarnException;
40 
41 import com.google.common.base.Preconditions;
42 import com.google.common.base.Supplier;
43 import com.google.common.collect.ImmutableList;
44 
45 @InterfaceAudience.Public
46 @InterfaceStability.Stable
47 public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
48     AbstractService {
49   private static final Log LOG = LogFactory.getLog(AMRMClient.class);
50 
51   /**
52    * Create a new instance of AMRMClient.
53    * For usage:
54    * <pre>
55    * {@code
56    * AMRMClient.<T>createAMRMClientContainerRequest()
57    * }</pre>
58    * @return the newly create AMRMClient instance.
59    */
60   @Public
createAMRMClient()61   public static <T extends ContainerRequest> AMRMClient<T> createAMRMClient() {
62     AMRMClient<T> client = new AMRMClientImpl<T>();
63     return client;
64   }
65 
66   private NMTokenCache nmTokenCache;
67 
68   @Private
AMRMClient(String name)69   protected AMRMClient(String name) {
70     super(name);
71     nmTokenCache = NMTokenCache.getSingleton();
72   }
73 
74   /**
75    * Object to represent a single container request for resources. Scheduler
76    * documentation should be consulted for the specifics of how the parameters
77    * are honored.
78    *
79    * By default, YARN schedulers try to allocate containers at the requested
80    * locations but they may relax the constraints in order to expedite meeting
81    * allocations limits. They first relax the constraint to the same rack as the
82    * requested node and then to anywhere in the cluster. The relaxLocality flag
83    * may be used to disable locality relaxation and request containers at only
84    * specific locations. The following conditions apply.
85    * <ul>
86    * <li>Within a priority, all container requests must have the same value for
87    * locality relaxation. Either enabled or disabled.</li>
88    * <li>If locality relaxation is disabled, then across requests, locations at
89    * different network levels may not be specified. E.g. its invalid to make a
90    * request for a specific node and another request for a specific rack.</li>
91    * <li>If locality relaxation is disabled, then only within the same request,
92    * a node and its rack may be specified together. This allows for a specific
93    * rack with a preference for a specific node within that rack.</li>
94    * <li></li>
95    * </ul>
96    * To re-enable locality relaxation at a given priority, all pending requests
97    * with locality relaxation disabled must be first removed. Then they can be
98    * added back with locality relaxation enabled.
99    *
100    * All getters return immutable values.
101    */
102   public static class ContainerRequest {
103     final Resource capability;
104     final List<String> nodes;
105     final List<String> racks;
106     final Priority priority;
107     final boolean relaxLocality;
108     final String nodeLabelsExpression;
109 
110     /**
111      * Instantiates a {@link ContainerRequest} with the given constraints and
112      * locality relaxation enabled.
113      *
114      * @param capability
115      *          The {@link Resource} to be requested for each container.
116      * @param nodes
117      *          Any hosts to request that the containers are placed on.
118      * @param racks
119      *          Any racks to request that the containers are placed on. The
120      *          racks corresponding to any hosts requested will be automatically
121      *          added to this list.
122      * @param priority
123      *          The priority at which to request the containers. Higher
124      *          priorities have lower numerical values.
125      */
ContainerRequest(Resource capability, String[] nodes, String[] racks, Priority priority)126     public ContainerRequest(Resource capability, String[] nodes,
127         String[] racks, Priority priority) {
128       this(capability, nodes, racks, priority, true, null);
129     }
130 
131     /**
132      * Instantiates a {@link ContainerRequest} with the given constraints.
133      *
134      * @param capability
135      *          The {@link Resource} to be requested for each container.
136      * @param nodes
137      *          Any hosts to request that the containers are placed on.
138      * @param racks
139      *          Any racks to request that the containers are placed on. The
140      *          racks corresponding to any hosts requested will be automatically
141      *          added to this list.
142      * @param priority
143      *          The priority at which to request the containers. Higher
144      *          priorities have lower numerical values.
145      * @param relaxLocality
146      *          If true, containers for this request may be assigned on hosts
147      *          and racks other than the ones explicitly requested.
148      */
ContainerRequest(Resource capability, String[] nodes, String[] racks, Priority priority, boolean relaxLocality)149     public ContainerRequest(Resource capability, String[] nodes,
150         String[] racks, Priority priority, boolean relaxLocality) {
151       this(capability, nodes, racks, priority, relaxLocality, null);
152     }
153 
154     /**
155      * Instantiates a {@link ContainerRequest} with the given constraints.
156      *
157      * @param capability
158      *          The {@link Resource} to be requested for each container.
159      * @param nodes
160      *          Any hosts to request that the containers are placed on.
161      * @param racks
162      *          Any racks to request that the containers are placed on. The
163      *          racks corresponding to any hosts requested will be automatically
164      *          added to this list.
165      * @param priority
166      *          The priority at which to request the containers. Higher
167      *          priorities have lower numerical values.
168      * @param relaxLocality
169      *          If true, containers for this request may be assigned on hosts
170      *          and racks other than the ones explicitly requested.
171      * @param nodeLabelsExpression
172      *          Set node labels to allocate resource, now we only support
173      *          asking for only a single node label
174      */
ContainerRequest(Resource capability, String[] nodes, String[] racks, Priority priority, boolean relaxLocality, String nodeLabelsExpression)175     public ContainerRequest(Resource capability, String[] nodes,
176         String[] racks, Priority priority, boolean relaxLocality,
177         String nodeLabelsExpression) {
178       // Validate request
179       Preconditions.checkArgument(capability != null,
180           "The Resource to be requested for each container " +
181               "should not be null ");
182       Preconditions.checkArgument(priority != null,
183           "The priority at which to request containers should not be null ");
184       Preconditions.checkArgument(
185               !(!relaxLocality && (racks == null || racks.length == 0)
186                   && (nodes == null || nodes.length == 0)),
187               "Can't turn off locality relaxation on a " +
188               "request with no location constraints");
189       this.capability = capability;
190       this.nodes = (nodes != null ? ImmutableList.copyOf(nodes) : null);
191       this.racks = (racks != null ? ImmutableList.copyOf(racks) : null);
192       this.priority = priority;
193       this.relaxLocality = relaxLocality;
194       this.nodeLabelsExpression = nodeLabelsExpression;
195     }
196 
getCapability()197     public Resource getCapability() {
198       return capability;
199     }
200 
getNodes()201     public List<String> getNodes() {
202       return nodes;
203     }
204 
getRacks()205     public List<String> getRacks() {
206       return racks;
207     }
208 
getPriority()209     public Priority getPriority() {
210       return priority;
211     }
212 
getRelaxLocality()213     public boolean getRelaxLocality() {
214       return relaxLocality;
215     }
216 
getNodeLabelExpression()217     public String getNodeLabelExpression() {
218       return nodeLabelsExpression;
219     }
220 
toString()221     public String toString() {
222       StringBuilder sb = new StringBuilder();
223       sb.append("Capability[").append(capability).append("]");
224       sb.append("Priority[").append(priority).append("]");
225       return sb.toString();
226     }
227   }
228 
229   /**
230    * Register the application master. This must be called before any
231    * other interaction
232    * @param appHostName Name of the host on which master is running
233    * @param appHostPort Port master is listening on
234    * @param appTrackingUrl URL at which the master info can be seen
235    * @return <code>RegisterApplicationMasterResponse</code>
236    * @throws YarnException
237    * @throws IOException
238    */
239   public abstract RegisterApplicationMasterResponse
registerApplicationMaster(String appHostName, int appHostPort, String appTrackingUrl)240                registerApplicationMaster(String appHostName,
241                                          int appHostPort,
242                                          String appTrackingUrl)
243                throws YarnException, IOException;
244 
245   /**
246    * Request additional containers and receive new container allocations.
247    * Requests made via <code>addContainerRequest</code> are sent to the
248    * <code>ResourceManager</code>. New containers assigned to the master are
249    * retrieved. Status of completed containers and node health updates are also
250    * retrieved. This also doubles up as a heartbeat to the ResourceManager and
251    * must be made periodically. The call may not always return any new
252    * allocations of containers. App should not make concurrent allocate
253    * requests. May cause request loss.
254    *
255    * <p>
256    * Note : If the user has not removed container requests that have already
257    * been satisfied, then the re-register may end up sending the entire
258    * container requests to the RM (including matched requests). Which would mean
259    * the RM could end up giving it a lot of new allocated containers.
260    * </p>
261    *
262    * @param progressIndicator Indicates progress made by the master
263    * @return the response of the allocate request
264    * @throws YarnException
265    * @throws IOException
266    */
allocate(float progressIndicator)267   public abstract AllocateResponse allocate(float progressIndicator)
268                            throws YarnException, IOException;
269 
270   /**
271    * Unregister the application master. This must be called in the end.
272    * @param appStatus Success/Failure status of the master
273    * @param appMessage Diagnostics message on failure
274    * @param appTrackingUrl New URL to get master info
275    * @throws YarnException
276    * @throws IOException
277    */
unregisterApplicationMaster(FinalApplicationStatus appStatus, String appMessage, String appTrackingUrl)278   public abstract void unregisterApplicationMaster(FinalApplicationStatus appStatus,
279                                            String appMessage,
280                                            String appTrackingUrl)
281                throws YarnException, IOException;
282 
283   /**
284    * Request containers for resources before calling <code>allocate</code>
285    * @param req Resource request
286    */
addContainerRequest(T req)287   public abstract void addContainerRequest(T req);
288 
289   /**
290    * Remove previous container request. The previous container request may have
291    * already been sent to the ResourceManager. So even after the remove request
292    * the app must be prepared to receive an allocation for the previous request
293    * even after the remove request
294    * @param req Resource request
295    */
removeContainerRequest(T req)296   public abstract void removeContainerRequest(T req);
297 
298   /**
299    * Release containers assigned by the Resource Manager. If the app cannot use
300    * the container or wants to give up the container then it can release them.
301    * The app needs to make new requests for the released resource capability if
302    * it still needs it. eg. it released non-local resources
303    * @param containerId
304    */
releaseAssignedContainer(ContainerId containerId)305   public abstract void releaseAssignedContainer(ContainerId containerId);
306 
307   /**
308    * Get the currently available resources in the cluster.
309    * A valid value is available after a call to allocate has been made
310    * @return Currently available resources
311    */
getAvailableResources()312   public abstract Resource getAvailableResources();
313 
314   /**
315    * Get the current number of nodes in the cluster.
316    * A valid values is available after a call to allocate has been made
317    * @return Current number of nodes in the cluster
318    */
getClusterNodeCount()319   public abstract int getClusterNodeCount();
320 
321   /**
322    * Get outstanding <code>ContainerRequest</code>s matching the given
323    * parameters. These ContainerRequests should have been added via
324    * <code>addContainerRequest</code> earlier in the lifecycle. For performance,
325    * the AMRMClient may return its internal collection directly without creating
326    * a copy. Users should not perform mutable operations on the return value.
327    * Each collection in the list contains requests with identical
328    * <code>Resource</code> size that fit in the given capability. In a
329    * collection, requests will be returned in the same order as they were added.
330    * @return Collection of request matching the parameters
331    */
getMatchingRequests( Priority priority, String resourceName, Resource capability)332   public abstract List<? extends Collection<T>> getMatchingRequests(
333                                            Priority priority,
334                                            String resourceName,
335                                            Resource capability);
336 
337   /**
338    * Update application's blacklist with addition or removal resources.
339    *
340    * @param blacklistAdditions list of resources which should be added to the
341    *        application blacklist
342    * @param blacklistRemovals list of resources which should be removed from the
343    *        application blacklist
344    */
updateBlacklist(List<String> blacklistAdditions, List<String> blacklistRemovals)345   public abstract void updateBlacklist(List<String> blacklistAdditions,
346       List<String> blacklistRemovals);
347 
348   /**
349    * Set the NM token cache for the <code>AMRMClient</code>. This cache must
350    * be shared with the {@link NMClient} used to manage containers for the
351    * <code>AMRMClient</code>
352    * <p>
353    * If a NM token cache is not set, the {@link NMTokenCache#getSingleton()}
354    * singleton instance will be used.
355    *
356    * @param nmTokenCache the NM token cache to use.
357    */
setNMTokenCache(NMTokenCache nmTokenCache)358   public void setNMTokenCache(NMTokenCache nmTokenCache) {
359     this.nmTokenCache = nmTokenCache;
360   }
361 
362   /**
363    * Get the NM token cache of the <code>AMRMClient</code>. This cache must be
364    * shared with the {@link NMClient} used to manage containers for the
365    * <code>AMRMClient</code>.
366    * <p>
367    * If a NM token cache is not set, the {@link NMTokenCache#getSingleton()}
368    * singleton instance will be used.
369    *
370    * @return the NM token cache.
371    */
getNMTokenCache()372   public NMTokenCache getNMTokenCache() {
373     return nmTokenCache;
374   }
375 
376   /**
377    * Wait for <code>check</code> to return true for each 1000 ms.
378    * See also {@link #waitFor(com.google.common.base.Supplier, int)}
379    * and {@link #waitFor(com.google.common.base.Supplier, int, int)}
380    * @param check
381    */
waitFor(Supplier<Boolean> check)382   public void waitFor(Supplier<Boolean> check) throws InterruptedException {
383     waitFor(check, 1000);
384   }
385 
386   /**
387    * Wait for <code>check</code> to return true for each
388    * <code>checkEveryMillis</code> ms.
389    * See also {@link #waitFor(com.google.common.base.Supplier, int, int)}
390    * @param check user defined checker
391    * @param checkEveryMillis interval to call <code>check</code>
392    */
waitFor(Supplier<Boolean> check, int checkEveryMillis)393   public void waitFor(Supplier<Boolean> check, int checkEveryMillis)
394       throws InterruptedException {
395     waitFor(check, checkEveryMillis, 1);
396   }
397 
398   /**
399    * Wait for <code>check</code> to return true for each
400    * <code>checkEveryMillis</code> ms. In the main loop, this method will log
401    * the message "waiting in main loop" for each <code>logInterval</code> times
402    * iteration to confirm the thread is alive.
403    * @param check user defined checker
404    * @param checkEveryMillis interval to call <code>check</code>
405    * @param logInterval interval to log for each
406    */
waitFor(Supplier<Boolean> check, int checkEveryMillis, int logInterval)407   public void waitFor(Supplier<Boolean> check, int checkEveryMillis,
408       int logInterval) throws InterruptedException {
409     Preconditions.checkNotNull(check, "check should not be null");
410     Preconditions.checkArgument(checkEveryMillis >= 0,
411         "checkEveryMillis should be positive value");
412     Preconditions.checkArgument(logInterval >= 0,
413         "logInterval should be positive value");
414 
415     int loggingCounter = logInterval;
416     do {
417       if (LOG.isDebugEnabled()) {
418         LOG.debug("Check the condition for main loop.");
419       }
420 
421       boolean result = check.get();
422       if (result) {
423         LOG.info("Exits the main loop.");
424         return;
425       }
426       if (--loggingCounter <= 0) {
427         LOG.info("Waiting in main loop.");
428         loggingCounter = logInterval;
429       }
430 
431       Thread.sleep(checkEveryMillis);
432     } while (true);
433   }
434 
435 }
436