1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.yarn.client.api; 20 21 import java.io.IOException; 22 import java.util.Collection; 23 import java.util.List; 24 25 import org.apache.commons.logging.Log; 26 import org.apache.commons.logging.LogFactory; 27 import org.apache.hadoop.classification.InterfaceAudience; 28 import org.apache.hadoop.classification.InterfaceAudience.Private; 29 import org.apache.hadoop.classification.InterfaceAudience.Public; 30 import org.apache.hadoop.classification.InterfaceStability; 31 import org.apache.hadoop.service.AbstractService; 32 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; 33 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; 34 import org.apache.hadoop.yarn.api.records.ContainerId; 35 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; 36 import org.apache.hadoop.yarn.api.records.Priority; 37 import org.apache.hadoop.yarn.api.records.Resource; 38 import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; 39 import org.apache.hadoop.yarn.exceptions.YarnException; 40 41 import com.google.common.base.Preconditions; 42 import com.google.common.base.Supplier; 43 import com.google.common.collect.ImmutableList; 44 45 @InterfaceAudience.Public 46 @InterfaceStability.Stable 47 public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends 48 AbstractService { 49 private static final Log LOG = LogFactory.getLog(AMRMClient.class); 50 51 /** 52 * Create a new instance of AMRMClient. 53 * For usage: 54 * <pre> 55 * {@code 56 * AMRMClient.<T>createAMRMClientContainerRequest() 57 * }</pre> 58 * @return the newly create AMRMClient instance. 59 */ 60 @Public createAMRMClient()61 public static <T extends ContainerRequest> AMRMClient<T> createAMRMClient() { 62 AMRMClient<T> client = new AMRMClientImpl<T>(); 63 return client; 64 } 65 66 private NMTokenCache nmTokenCache; 67 68 @Private AMRMClient(String name)69 protected AMRMClient(String name) { 70 super(name); 71 nmTokenCache = NMTokenCache.getSingleton(); 72 } 73 74 /** 75 * Object to represent a single container request for resources. Scheduler 76 * documentation should be consulted for the specifics of how the parameters 77 * are honored. 78 * 79 * By default, YARN schedulers try to allocate containers at the requested 80 * locations but they may relax the constraints in order to expedite meeting 81 * allocations limits. They first relax the constraint to the same rack as the 82 * requested node and then to anywhere in the cluster. The relaxLocality flag 83 * may be used to disable locality relaxation and request containers at only 84 * specific locations. The following conditions apply. 85 * <ul> 86 * <li>Within a priority, all container requests must have the same value for 87 * locality relaxation. Either enabled or disabled.</li> 88 * <li>If locality relaxation is disabled, then across requests, locations at 89 * different network levels may not be specified. E.g. its invalid to make a 90 * request for a specific node and another request for a specific rack.</li> 91 * <li>If locality relaxation is disabled, then only within the same request, 92 * a node and its rack may be specified together. This allows for a specific 93 * rack with a preference for a specific node within that rack.</li> 94 * <li></li> 95 * </ul> 96 * To re-enable locality relaxation at a given priority, all pending requests 97 * with locality relaxation disabled must be first removed. Then they can be 98 * added back with locality relaxation enabled. 99 * 100 * All getters return immutable values. 101 */ 102 public static class ContainerRequest { 103 final Resource capability; 104 final List<String> nodes; 105 final List<String> racks; 106 final Priority priority; 107 final boolean relaxLocality; 108 final String nodeLabelsExpression; 109 110 /** 111 * Instantiates a {@link ContainerRequest} with the given constraints and 112 * locality relaxation enabled. 113 * 114 * @param capability 115 * The {@link Resource} to be requested for each container. 116 * @param nodes 117 * Any hosts to request that the containers are placed on. 118 * @param racks 119 * Any racks to request that the containers are placed on. The 120 * racks corresponding to any hosts requested will be automatically 121 * added to this list. 122 * @param priority 123 * The priority at which to request the containers. Higher 124 * priorities have lower numerical values. 125 */ ContainerRequest(Resource capability, String[] nodes, String[] racks, Priority priority)126 public ContainerRequest(Resource capability, String[] nodes, 127 String[] racks, Priority priority) { 128 this(capability, nodes, racks, priority, true, null); 129 } 130 131 /** 132 * Instantiates a {@link ContainerRequest} with the given constraints. 133 * 134 * @param capability 135 * The {@link Resource} to be requested for each container. 136 * @param nodes 137 * Any hosts to request that the containers are placed on. 138 * @param racks 139 * Any racks to request that the containers are placed on. The 140 * racks corresponding to any hosts requested will be automatically 141 * added to this list. 142 * @param priority 143 * The priority at which to request the containers. Higher 144 * priorities have lower numerical values. 145 * @param relaxLocality 146 * If true, containers for this request may be assigned on hosts 147 * and racks other than the ones explicitly requested. 148 */ ContainerRequest(Resource capability, String[] nodes, String[] racks, Priority priority, boolean relaxLocality)149 public ContainerRequest(Resource capability, String[] nodes, 150 String[] racks, Priority priority, boolean relaxLocality) { 151 this(capability, nodes, racks, priority, relaxLocality, null); 152 } 153 154 /** 155 * Instantiates a {@link ContainerRequest} with the given constraints. 156 * 157 * @param capability 158 * The {@link Resource} to be requested for each container. 159 * @param nodes 160 * Any hosts to request that the containers are placed on. 161 * @param racks 162 * Any racks to request that the containers are placed on. The 163 * racks corresponding to any hosts requested will be automatically 164 * added to this list. 165 * @param priority 166 * The priority at which to request the containers. Higher 167 * priorities have lower numerical values. 168 * @param relaxLocality 169 * If true, containers for this request may be assigned on hosts 170 * and racks other than the ones explicitly requested. 171 * @param nodeLabelsExpression 172 * Set node labels to allocate resource, now we only support 173 * asking for only a single node label 174 */ ContainerRequest(Resource capability, String[] nodes, String[] racks, Priority priority, boolean relaxLocality, String nodeLabelsExpression)175 public ContainerRequest(Resource capability, String[] nodes, 176 String[] racks, Priority priority, boolean relaxLocality, 177 String nodeLabelsExpression) { 178 // Validate request 179 Preconditions.checkArgument(capability != null, 180 "The Resource to be requested for each container " + 181 "should not be null "); 182 Preconditions.checkArgument(priority != null, 183 "The priority at which to request containers should not be null "); 184 Preconditions.checkArgument( 185 !(!relaxLocality && (racks == null || racks.length == 0) 186 && (nodes == null || nodes.length == 0)), 187 "Can't turn off locality relaxation on a " + 188 "request with no location constraints"); 189 this.capability = capability; 190 this.nodes = (nodes != null ? ImmutableList.copyOf(nodes) : null); 191 this.racks = (racks != null ? ImmutableList.copyOf(racks) : null); 192 this.priority = priority; 193 this.relaxLocality = relaxLocality; 194 this.nodeLabelsExpression = nodeLabelsExpression; 195 } 196 getCapability()197 public Resource getCapability() { 198 return capability; 199 } 200 getNodes()201 public List<String> getNodes() { 202 return nodes; 203 } 204 getRacks()205 public List<String> getRacks() { 206 return racks; 207 } 208 getPriority()209 public Priority getPriority() { 210 return priority; 211 } 212 getRelaxLocality()213 public boolean getRelaxLocality() { 214 return relaxLocality; 215 } 216 getNodeLabelExpression()217 public String getNodeLabelExpression() { 218 return nodeLabelsExpression; 219 } 220 toString()221 public String toString() { 222 StringBuilder sb = new StringBuilder(); 223 sb.append("Capability[").append(capability).append("]"); 224 sb.append("Priority[").append(priority).append("]"); 225 return sb.toString(); 226 } 227 } 228 229 /** 230 * Register the application master. This must be called before any 231 * other interaction 232 * @param appHostName Name of the host on which master is running 233 * @param appHostPort Port master is listening on 234 * @param appTrackingUrl URL at which the master info can be seen 235 * @return <code>RegisterApplicationMasterResponse</code> 236 * @throws YarnException 237 * @throws IOException 238 */ 239 public abstract RegisterApplicationMasterResponse registerApplicationMaster(String appHostName, int appHostPort, String appTrackingUrl)240 registerApplicationMaster(String appHostName, 241 int appHostPort, 242 String appTrackingUrl) 243 throws YarnException, IOException; 244 245 /** 246 * Request additional containers and receive new container allocations. 247 * Requests made via <code>addContainerRequest</code> are sent to the 248 * <code>ResourceManager</code>. New containers assigned to the master are 249 * retrieved. Status of completed containers and node health updates are also 250 * retrieved. This also doubles up as a heartbeat to the ResourceManager and 251 * must be made periodically. The call may not always return any new 252 * allocations of containers. App should not make concurrent allocate 253 * requests. May cause request loss. 254 * 255 * <p> 256 * Note : If the user has not removed container requests that have already 257 * been satisfied, then the re-register may end up sending the entire 258 * container requests to the RM (including matched requests). Which would mean 259 * the RM could end up giving it a lot of new allocated containers. 260 * </p> 261 * 262 * @param progressIndicator Indicates progress made by the master 263 * @return the response of the allocate request 264 * @throws YarnException 265 * @throws IOException 266 */ allocate(float progressIndicator)267 public abstract AllocateResponse allocate(float progressIndicator) 268 throws YarnException, IOException; 269 270 /** 271 * Unregister the application master. This must be called in the end. 272 * @param appStatus Success/Failure status of the master 273 * @param appMessage Diagnostics message on failure 274 * @param appTrackingUrl New URL to get master info 275 * @throws YarnException 276 * @throws IOException 277 */ unregisterApplicationMaster(FinalApplicationStatus appStatus, String appMessage, String appTrackingUrl)278 public abstract void unregisterApplicationMaster(FinalApplicationStatus appStatus, 279 String appMessage, 280 String appTrackingUrl) 281 throws YarnException, IOException; 282 283 /** 284 * Request containers for resources before calling <code>allocate</code> 285 * @param req Resource request 286 */ addContainerRequest(T req)287 public abstract void addContainerRequest(T req); 288 289 /** 290 * Remove previous container request. The previous container request may have 291 * already been sent to the ResourceManager. So even after the remove request 292 * the app must be prepared to receive an allocation for the previous request 293 * even after the remove request 294 * @param req Resource request 295 */ removeContainerRequest(T req)296 public abstract void removeContainerRequest(T req); 297 298 /** 299 * Release containers assigned by the Resource Manager. If the app cannot use 300 * the container or wants to give up the container then it can release them. 301 * The app needs to make new requests for the released resource capability if 302 * it still needs it. eg. it released non-local resources 303 * @param containerId 304 */ releaseAssignedContainer(ContainerId containerId)305 public abstract void releaseAssignedContainer(ContainerId containerId); 306 307 /** 308 * Get the currently available resources in the cluster. 309 * A valid value is available after a call to allocate has been made 310 * @return Currently available resources 311 */ getAvailableResources()312 public abstract Resource getAvailableResources(); 313 314 /** 315 * Get the current number of nodes in the cluster. 316 * A valid values is available after a call to allocate has been made 317 * @return Current number of nodes in the cluster 318 */ getClusterNodeCount()319 public abstract int getClusterNodeCount(); 320 321 /** 322 * Get outstanding <code>ContainerRequest</code>s matching the given 323 * parameters. These ContainerRequests should have been added via 324 * <code>addContainerRequest</code> earlier in the lifecycle. For performance, 325 * the AMRMClient may return its internal collection directly without creating 326 * a copy. Users should not perform mutable operations on the return value. 327 * Each collection in the list contains requests with identical 328 * <code>Resource</code> size that fit in the given capability. In a 329 * collection, requests will be returned in the same order as they were added. 330 * @return Collection of request matching the parameters 331 */ getMatchingRequests( Priority priority, String resourceName, Resource capability)332 public abstract List<? extends Collection<T>> getMatchingRequests( 333 Priority priority, 334 String resourceName, 335 Resource capability); 336 337 /** 338 * Update application's blacklist with addition or removal resources. 339 * 340 * @param blacklistAdditions list of resources which should be added to the 341 * application blacklist 342 * @param blacklistRemovals list of resources which should be removed from the 343 * application blacklist 344 */ updateBlacklist(List<String> blacklistAdditions, List<String> blacklistRemovals)345 public abstract void updateBlacklist(List<String> blacklistAdditions, 346 List<String> blacklistRemovals); 347 348 /** 349 * Set the NM token cache for the <code>AMRMClient</code>. This cache must 350 * be shared with the {@link NMClient} used to manage containers for the 351 * <code>AMRMClient</code> 352 * <p> 353 * If a NM token cache is not set, the {@link NMTokenCache#getSingleton()} 354 * singleton instance will be used. 355 * 356 * @param nmTokenCache the NM token cache to use. 357 */ setNMTokenCache(NMTokenCache nmTokenCache)358 public void setNMTokenCache(NMTokenCache nmTokenCache) { 359 this.nmTokenCache = nmTokenCache; 360 } 361 362 /** 363 * Get the NM token cache of the <code>AMRMClient</code>. This cache must be 364 * shared with the {@link NMClient} used to manage containers for the 365 * <code>AMRMClient</code>. 366 * <p> 367 * If a NM token cache is not set, the {@link NMTokenCache#getSingleton()} 368 * singleton instance will be used. 369 * 370 * @return the NM token cache. 371 */ getNMTokenCache()372 public NMTokenCache getNMTokenCache() { 373 return nmTokenCache; 374 } 375 376 /** 377 * Wait for <code>check</code> to return true for each 1000 ms. 378 * See also {@link #waitFor(com.google.common.base.Supplier, int)} 379 * and {@link #waitFor(com.google.common.base.Supplier, int, int)} 380 * @param check 381 */ waitFor(Supplier<Boolean> check)382 public void waitFor(Supplier<Boolean> check) throws InterruptedException { 383 waitFor(check, 1000); 384 } 385 386 /** 387 * Wait for <code>check</code> to return true for each 388 * <code>checkEveryMillis</code> ms. 389 * See also {@link #waitFor(com.google.common.base.Supplier, int, int)} 390 * @param check user defined checker 391 * @param checkEveryMillis interval to call <code>check</code> 392 */ waitFor(Supplier<Boolean> check, int checkEveryMillis)393 public void waitFor(Supplier<Boolean> check, int checkEveryMillis) 394 throws InterruptedException { 395 waitFor(check, checkEveryMillis, 1); 396 } 397 398 /** 399 * Wait for <code>check</code> to return true for each 400 * <code>checkEveryMillis</code> ms. In the main loop, this method will log 401 * the message "waiting in main loop" for each <code>logInterval</code> times 402 * iteration to confirm the thread is alive. 403 * @param check user defined checker 404 * @param checkEveryMillis interval to call <code>check</code> 405 * @param logInterval interval to log for each 406 */ waitFor(Supplier<Boolean> check, int checkEveryMillis, int logInterval)407 public void waitFor(Supplier<Boolean> check, int checkEveryMillis, 408 int logInterval) throws InterruptedException { 409 Preconditions.checkNotNull(check, "check should not be null"); 410 Preconditions.checkArgument(checkEveryMillis >= 0, 411 "checkEveryMillis should be positive value"); 412 Preconditions.checkArgument(logInterval >= 0, 413 "logInterval should be positive value"); 414 415 int loggingCounter = logInterval; 416 do { 417 if (LOG.isDebugEnabled()) { 418 LOG.debug("Check the condition for main loop."); 419 } 420 421 boolean result = check.get(); 422 if (result) { 423 LOG.info("Exits the main loop."); 424 return; 425 } 426 if (--loggingCounter <= 0) { 427 LOG.info("Waiting in main loop."); 428 loggingCounter = logInterval; 429 } 430 431 Thread.sleep(checkEveryMillis); 432 } while (true); 433 } 434 435 } 436