1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.yarn.server.resourcemanager; 20 21 import com.google.common.annotations.VisibleForTesting; 22 import org.apache.commons.logging.Log; 23 import org.apache.commons.logging.LogFactory; 24 import org.apache.hadoop.classification.InterfaceAudience.Private; 25 import org.apache.hadoop.conf.Configuration; 26 import org.apache.hadoop.ha.HAServiceProtocol; 27 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; 28 import org.apache.hadoop.http.lib.StaticUserWebFilter; 29 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; 30 import org.apache.hadoop.metrics2.source.JvmMetrics; 31 import org.apache.hadoop.security.*; 32 import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler; 33 import org.apache.hadoop.security.authorize.ProxyUsers; 34 import org.apache.hadoop.service.AbstractService; 35 import org.apache.hadoop.service.CompositeService; 36 import org.apache.hadoop.service.Service; 37 import org.apache.hadoop.util.ExitUtil; 38 import org.apache.hadoop.util.GenericOptionsParser; 39 import org.apache.hadoop.util.ReflectionUtils; 40 import org.apache.hadoop.util.ShutdownHookManager; 41 import org.apache.hadoop.util.StringUtils; 42 import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; 43 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; 44 import org.apache.hadoop.yarn.api.records.ApplicationId; 45 import org.apache.hadoop.yarn.api.records.NodeId; 46 import org.apache.hadoop.yarn.conf.ConfigurationProvider; 47 import org.apache.hadoop.yarn.conf.ConfigurationProviderFactory; 48 import org.apache.hadoop.yarn.conf.HAUtil; 49 import org.apache.hadoop.yarn.conf.YarnConfiguration; 50 import org.apache.hadoop.yarn.event.AsyncDispatcher; 51 import org.apache.hadoop.yarn.event.Dispatcher; 52 import org.apache.hadoop.yarn.event.EventHandler; 53 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; 54 import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; 55 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; 56 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; 57 import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; 58 import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; 59 import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor; 60 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; 61 import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; 62 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; 63 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; 64 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreFactory; 65 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; 66 import org.apache.hadoop.yarn.server.resourcemanager.reservation.AbstractReservationSystem; 67 import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; 68 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; 69 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; 70 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; 71 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; 72 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; 73 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; 74 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; 75 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; 76 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; 77 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; 78 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; 79 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; 80 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; 81 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; 82 import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; 83 import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager; 84 import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp; 85 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; 86 import org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilter; 87 import org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilterInitializer; 88 import org.apache.hadoop.yarn.server.webproxy.AppReportFetcher; 89 import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils; 90 import org.apache.hadoop.yarn.server.webproxy.WebAppProxy; 91 import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet; 92 import org.apache.hadoop.yarn.webapp.WebApp; 93 import org.apache.hadoop.yarn.webapp.WebApps; 94 import org.apache.hadoop.yarn.webapp.WebApps.Builder; 95 import org.apache.hadoop.yarn.webapp.util.WebAppUtils; 96 97 import java.io.IOException; 98 import java.io.InputStream; 99 import java.net.InetSocketAddress; 100 import java.security.PrivilegedExceptionAction; 101 import java.util.ArrayList; 102 import java.util.List; 103 import java.util.concurrent.BlockingQueue; 104 import java.util.concurrent.LinkedBlockingQueue; 105 106 /** 107 * The ResourceManager is the main class that is a set of components. 108 * "I am the ResourceManager. All your resources belong to us..." 109 * 110 */ 111 @SuppressWarnings("unchecked") 112 public class ResourceManager extends CompositeService implements Recoverable { 113 114 /** 115 * Priority of the ResourceManager shutdown hook. 116 */ 117 public static final int SHUTDOWN_HOOK_PRIORITY = 30; 118 119 private static final Log LOG = LogFactory.getLog(ResourceManager.class); 120 private static long clusterTimeStamp = System.currentTimeMillis(); 121 122 /** 123 * "Always On" services. Services that need to run always irrespective of 124 * the HA state of the RM. 125 */ 126 @VisibleForTesting 127 protected RMContextImpl rmContext; 128 private Dispatcher rmDispatcher; 129 @VisibleForTesting 130 protected AdminService adminService; 131 132 /** 133 * "Active" services. Services that need to run only on the Active RM. 134 * These services are managed (initialized, started, stopped) by the 135 * {@link CompositeService} RMActiveServices. 136 * 137 * RM is active when (1) HA is disabled, or (2) HA is enabled and the RM is 138 * in Active state. 139 */ 140 protected RMActiveServices activeServices; 141 protected RMSecretManagerService rmSecretManagerService; 142 143 protected ResourceScheduler scheduler; 144 protected ReservationSystem reservationSystem; 145 private ClientRMService clientRM; 146 protected ApplicationMasterService masterService; 147 protected NMLivelinessMonitor nmLivelinessMonitor; 148 protected NodesListManager nodesListManager; 149 protected RMAppManager rmAppManager; 150 protected ApplicationACLsManager applicationACLsManager; 151 protected QueueACLsManager queueACLsManager; 152 private WebApp webApp; 153 private AppReportFetcher fetcher = null; 154 protected ResourceTrackerService resourceTracker; 155 156 @VisibleForTesting 157 protected String webAppAddress; 158 private ConfigurationProvider configurationProvider = null; 159 /** End of Active services */ 160 161 private Configuration conf; 162 163 private UserGroupInformation rmLoginUGI; 164 ResourceManager()165 public ResourceManager() { 166 super("ResourceManager"); 167 } 168 getRMContext()169 public RMContext getRMContext() { 170 return this.rmContext; 171 } 172 getClusterTimeStamp()173 public static long getClusterTimeStamp() { 174 return clusterTimeStamp; 175 } 176 177 @VisibleForTesting setClusterTimeStamp(long timestamp)178 protected static void setClusterTimeStamp(long timestamp) { 179 clusterTimeStamp = timestamp; 180 } 181 182 @Override serviceInit(Configuration conf)183 protected void serviceInit(Configuration conf) throws Exception { 184 this.conf = conf; 185 this.rmContext = new RMContextImpl(); 186 187 this.configurationProvider = 188 ConfigurationProviderFactory.getConfigurationProvider(conf); 189 this.configurationProvider.init(this.conf); 190 rmContext.setConfigurationProvider(configurationProvider); 191 192 // load core-site.xml 193 InputStream coreSiteXMLInputStream = 194 this.configurationProvider.getConfigurationInputStream(this.conf, 195 YarnConfiguration.CORE_SITE_CONFIGURATION_FILE); 196 if (coreSiteXMLInputStream != null) { 197 this.conf.addResource(coreSiteXMLInputStream); 198 } 199 200 // Do refreshUserToGroupsMappings with loaded core-site.xml 201 Groups.getUserToGroupsMappingServiceWithLoadedConfiguration(this.conf) 202 .refresh(); 203 204 // Do refreshSuperUserGroupsConfiguration with loaded core-site.xml 205 // Or use RM specific configurations to overwrite the common ones first 206 // if they exist 207 RMServerUtils.processRMProxyUsersConf(conf); 208 ProxyUsers.refreshSuperUserGroupsConfiguration(this.conf); 209 210 // load yarn-site.xml 211 InputStream yarnSiteXMLInputStream = 212 this.configurationProvider.getConfigurationInputStream(this.conf, 213 YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); 214 if (yarnSiteXMLInputStream != null) { 215 this.conf.addResource(yarnSiteXMLInputStream); 216 } 217 218 validateConfigs(this.conf); 219 220 // Set HA configuration should be done before login 221 this.rmContext.setHAEnabled(HAUtil.isHAEnabled(this.conf)); 222 if (this.rmContext.isHAEnabled()) { 223 HAUtil.verifyAndSetConfiguration(this.conf); 224 } 225 226 // Set UGI and do login 227 // If security is enabled, use login user 228 // If security is not enabled, use current user 229 this.rmLoginUGI = UserGroupInformation.getCurrentUser(); 230 try { 231 doSecureLogin(); 232 } catch(IOException ie) { 233 throw new YarnRuntimeException("Failed to login", ie); 234 } 235 236 // register the handlers for all AlwaysOn services using setupDispatcher(). 237 rmDispatcher = setupDispatcher(); 238 addIfService(rmDispatcher); 239 rmContext.setDispatcher(rmDispatcher); 240 241 adminService = createAdminService(); 242 addService(adminService); 243 rmContext.setRMAdminService(adminService); 244 245 rmContext.setYarnConfiguration(conf); 246 247 createAndInitActiveServices(); 248 249 webAppAddress = WebAppUtils.getWebAppBindURL(this.conf, 250 YarnConfiguration.RM_BIND_HOST, 251 WebAppUtils.getRMWebAppURLWithoutScheme(this.conf)); 252 253 RMApplicationHistoryWriter rmApplicationHistoryWriter = 254 createRMApplicationHistoryWriter(); 255 addService(rmApplicationHistoryWriter); 256 rmContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter); 257 258 SystemMetricsPublisher systemMetricsPublisher = createSystemMetricsPublisher(); 259 addService(systemMetricsPublisher); 260 rmContext.setSystemMetricsPublisher(systemMetricsPublisher); 261 262 super.serviceInit(this.conf); 263 } 264 createQueueACLsManager(ResourceScheduler scheduler, Configuration conf)265 protected QueueACLsManager createQueueACLsManager(ResourceScheduler scheduler, 266 Configuration conf) { 267 return new QueueACLsManager(scheduler, conf); 268 } 269 270 @VisibleForTesting setRMStateStore(RMStateStore rmStore)271 protected void setRMStateStore(RMStateStore rmStore) { 272 rmStore.setRMDispatcher(rmDispatcher); 273 rmStore.setResourceManager(this); 274 rmContext.setStateStore(rmStore); 275 } 276 createSchedulerEventDispatcher()277 protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() { 278 return new SchedulerEventDispatcher(this.scheduler); 279 } 280 createDispatcher()281 protected Dispatcher createDispatcher() { 282 return new AsyncDispatcher(); 283 } 284 createScheduler()285 protected ResourceScheduler createScheduler() { 286 String schedulerClassName = conf.get(YarnConfiguration.RM_SCHEDULER, 287 YarnConfiguration.DEFAULT_RM_SCHEDULER); 288 LOG.info("Using Scheduler: " + schedulerClassName); 289 try { 290 Class<?> schedulerClazz = Class.forName(schedulerClassName); 291 if (ResourceScheduler.class.isAssignableFrom(schedulerClazz)) { 292 return (ResourceScheduler) ReflectionUtils.newInstance(schedulerClazz, 293 this.conf); 294 } else { 295 throw new YarnRuntimeException("Class: " + schedulerClassName 296 + " not instance of " + ResourceScheduler.class.getCanonicalName()); 297 } 298 } catch (ClassNotFoundException e) { 299 throw new YarnRuntimeException("Could not instantiate Scheduler: " 300 + schedulerClassName, e); 301 } 302 } 303 createReservationSystem()304 protected ReservationSystem createReservationSystem() { 305 String reservationClassName = 306 conf.get(YarnConfiguration.RM_RESERVATION_SYSTEM_CLASS, 307 AbstractReservationSystem.getDefaultReservationSystem(scheduler)); 308 if (reservationClassName == null) { 309 return null; 310 } 311 LOG.info("Using ReservationSystem: " + reservationClassName); 312 try { 313 Class<?> reservationClazz = Class.forName(reservationClassName); 314 if (ReservationSystem.class.isAssignableFrom(reservationClazz)) { 315 return (ReservationSystem) ReflectionUtils.newInstance( 316 reservationClazz, this.conf); 317 } else { 318 throw new YarnRuntimeException("Class: " + reservationClassName 319 + " not instance of " + ReservationSystem.class.getCanonicalName()); 320 } 321 } catch (ClassNotFoundException e) { 322 throw new YarnRuntimeException( 323 "Could not instantiate ReservationSystem: " + reservationClassName, e); 324 } 325 } 326 createAMLauncher()327 protected ApplicationMasterLauncher createAMLauncher() { 328 return new ApplicationMasterLauncher(this.rmContext); 329 } 330 createNMLivelinessMonitor()331 private NMLivelinessMonitor createNMLivelinessMonitor() { 332 return new NMLivelinessMonitor(this.rmContext 333 .getDispatcher()); 334 } 335 createAMLivelinessMonitor()336 protected AMLivelinessMonitor createAMLivelinessMonitor() { 337 return new AMLivelinessMonitor(this.rmDispatcher); 338 } 339 createNodeLabelManager()340 protected RMNodeLabelsManager createNodeLabelManager() 341 throws InstantiationException, IllegalAccessException { 342 return new RMNodeLabelsManager(); 343 } 344 createDelegationTokenRenewer()345 protected DelegationTokenRenewer createDelegationTokenRenewer() { 346 return new DelegationTokenRenewer(); 347 } 348 createRMAppManager()349 protected RMAppManager createRMAppManager() { 350 return new RMAppManager(this.rmContext, this.scheduler, this.masterService, 351 this.applicationACLsManager, this.conf); 352 } 353 createRMApplicationHistoryWriter()354 protected RMApplicationHistoryWriter createRMApplicationHistoryWriter() { 355 return new RMApplicationHistoryWriter(); 356 } 357 createSystemMetricsPublisher()358 protected SystemMetricsPublisher createSystemMetricsPublisher() { 359 return new SystemMetricsPublisher(); 360 } 361 362 // sanity check for configurations validateConfigs(Configuration conf)363 protected static void validateConfigs(Configuration conf) { 364 // validate max-attempts 365 int globalMaxAppAttempts = 366 conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 367 YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); 368 if (globalMaxAppAttempts <= 0) { 369 throw new YarnRuntimeException("Invalid global max attempts configuration" 370 + ", " + YarnConfiguration.RM_AM_MAX_ATTEMPTS 371 + "=" + globalMaxAppAttempts + ", it should be a positive integer."); 372 } 373 374 // validate expireIntvl >= heartbeatIntvl 375 long expireIntvl = conf.getLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 376 YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS); 377 long heartbeatIntvl = 378 conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 379 YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS); 380 if (expireIntvl < heartbeatIntvl) { 381 throw new YarnRuntimeException("Nodemanager expiry interval should be no" 382 + " less than heartbeat interval, " 383 + YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS + "=" + expireIntvl 384 + ", " + YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS + "=" 385 + heartbeatIntvl); 386 } 387 } 388 389 /** 390 * RMActiveServices handles all the Active services in the RM. 391 */ 392 @Private 393 public class RMActiveServices extends CompositeService { 394 395 private DelegationTokenRenewer delegationTokenRenewer; 396 private EventHandler<SchedulerEvent> schedulerDispatcher; 397 private ApplicationMasterLauncher applicationMasterLauncher; 398 private ContainerAllocationExpirer containerAllocationExpirer; 399 private ResourceManager rm; 400 private boolean recoveryEnabled; 401 private RMActiveServiceContext activeServiceContext; 402 RMActiveServices(ResourceManager rm)403 RMActiveServices(ResourceManager rm) { 404 super("RMActiveServices"); 405 this.rm = rm; 406 } 407 408 @Override serviceInit(Configuration configuration)409 protected void serviceInit(Configuration configuration) throws Exception { 410 activeServiceContext = new RMActiveServiceContext(); 411 rmContext.setActiveServiceContext(activeServiceContext); 412 413 conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true); 414 rmSecretManagerService = createRMSecretManagerService(); 415 addService(rmSecretManagerService); 416 417 containerAllocationExpirer = new ContainerAllocationExpirer(rmDispatcher); 418 addService(containerAllocationExpirer); 419 rmContext.setContainerAllocationExpirer(containerAllocationExpirer); 420 421 AMLivelinessMonitor amLivelinessMonitor = createAMLivelinessMonitor(); 422 addService(amLivelinessMonitor); 423 rmContext.setAMLivelinessMonitor(amLivelinessMonitor); 424 425 AMLivelinessMonitor amFinishingMonitor = createAMLivelinessMonitor(); 426 addService(amFinishingMonitor); 427 rmContext.setAMFinishingMonitor(amFinishingMonitor); 428 429 RMNodeLabelsManager nlm = createNodeLabelManager(); 430 nlm.setRMContext(rmContext); 431 addService(nlm); 432 rmContext.setNodeLabelManager(nlm); 433 434 boolean isRecoveryEnabled = conf.getBoolean( 435 YarnConfiguration.RECOVERY_ENABLED, 436 YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED); 437 438 RMStateStore rmStore = null; 439 if (isRecoveryEnabled) { 440 recoveryEnabled = true; 441 rmStore = RMStateStoreFactory.getStore(conf); 442 boolean isWorkPreservingRecoveryEnabled = 443 conf.getBoolean( 444 YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, 445 YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED); 446 rmContext 447 .setWorkPreservingRecoveryEnabled(isWorkPreservingRecoveryEnabled); 448 } else { 449 recoveryEnabled = false; 450 rmStore = new NullRMStateStore(); 451 } 452 453 try { 454 rmStore.init(conf); 455 rmStore.setRMDispatcher(rmDispatcher); 456 rmStore.setResourceManager(rm); 457 } catch (Exception e) { 458 // the Exception from stateStore.init() needs to be handled for 459 // HA and we need to give up master status if we got fenced 460 LOG.error("Failed to init state store", e); 461 throw e; 462 } 463 rmContext.setStateStore(rmStore); 464 465 if (UserGroupInformation.isSecurityEnabled()) { 466 delegationTokenRenewer = createDelegationTokenRenewer(); 467 rmContext.setDelegationTokenRenewer(delegationTokenRenewer); 468 } 469 470 // Register event handler for NodesListManager 471 nodesListManager = new NodesListManager(rmContext); 472 rmDispatcher.register(NodesListManagerEventType.class, nodesListManager); 473 addService(nodesListManager); 474 rmContext.setNodesListManager(nodesListManager); 475 476 // Initialize the scheduler 477 scheduler = createScheduler(); 478 scheduler.setRMContext(rmContext); 479 addIfService(scheduler); 480 rmContext.setScheduler(scheduler); 481 482 schedulerDispatcher = createSchedulerEventDispatcher(); 483 addIfService(schedulerDispatcher); 484 rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher); 485 486 // Register event handler for RmAppEvents 487 rmDispatcher.register(RMAppEventType.class, 488 new ApplicationEventDispatcher(rmContext)); 489 490 // Register event handler for RmAppAttemptEvents 491 rmDispatcher.register(RMAppAttemptEventType.class, 492 new ApplicationAttemptEventDispatcher(rmContext)); 493 494 // Register event handler for RmNodes 495 rmDispatcher.register( 496 RMNodeEventType.class, new NodeEventDispatcher(rmContext)); 497 498 nmLivelinessMonitor = createNMLivelinessMonitor(); 499 addService(nmLivelinessMonitor); 500 501 resourceTracker = createResourceTrackerService(); 502 addService(resourceTracker); 503 rmContext.setResourceTrackerService(resourceTracker); 504 505 DefaultMetricsSystem.initialize("ResourceManager"); 506 JvmMetrics.initSingleton("ResourceManager", null); 507 508 // Initialize the Reservation system 509 if (conf.getBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, 510 YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_ENABLE)) { 511 reservationSystem = createReservationSystem(); 512 if (reservationSystem != null) { 513 reservationSystem.setRMContext(rmContext); 514 addIfService(reservationSystem); 515 rmContext.setReservationSystem(reservationSystem); 516 LOG.info("Initialized Reservation system"); 517 } 518 } 519 520 // creating monitors that handle preemption 521 createPolicyMonitors(); 522 523 masterService = createApplicationMasterService(); 524 addService(masterService) ; 525 rmContext.setApplicationMasterService(masterService); 526 527 applicationACLsManager = new ApplicationACLsManager(conf); 528 529 queueACLsManager = createQueueACLsManager(scheduler, conf); 530 531 rmAppManager = createRMAppManager(); 532 // Register event handler for RMAppManagerEvents 533 rmDispatcher.register(RMAppManagerEventType.class, rmAppManager); 534 535 clientRM = createClientRMService(); 536 addService(clientRM); 537 rmContext.setClientRMService(clientRM); 538 539 applicationMasterLauncher = createAMLauncher(); 540 rmDispatcher.register(AMLauncherEventType.class, 541 applicationMasterLauncher); 542 543 addService(applicationMasterLauncher); 544 if (UserGroupInformation.isSecurityEnabled()) { 545 addService(delegationTokenRenewer); 546 delegationTokenRenewer.setRMContext(rmContext); 547 } 548 549 new RMNMInfo(rmContext, scheduler); 550 551 super.serviceInit(conf); 552 } 553 554 @Override serviceStart()555 protected void serviceStart() throws Exception { 556 RMStateStore rmStore = rmContext.getStateStore(); 557 // The state store needs to start irrespective of recoveryEnabled as apps 558 // need events to move to further states. 559 rmStore.start(); 560 561 if(recoveryEnabled) { 562 try { 563 LOG.info("Recovery started"); 564 rmStore.checkVersion(); 565 if (rmContext.isWorkPreservingRecoveryEnabled()) { 566 rmContext.setEpoch(rmStore.getAndIncrementEpoch()); 567 } 568 RMState state = rmStore.loadState(); 569 recover(state); 570 LOG.info("Recovery ended"); 571 } catch (Exception e) { 572 // the Exception from loadState() needs to be handled for 573 // HA and we need to give up master status if we got fenced 574 LOG.error("Failed to load/recover state", e); 575 throw e; 576 } 577 } 578 579 super.serviceStart(); 580 } 581 582 @Override serviceStop()583 protected void serviceStop() throws Exception { 584 585 super.serviceStop(); 586 DefaultMetricsSystem.shutdown(); 587 if (rmContext != null) { 588 RMStateStore store = rmContext.getStateStore(); 589 try { 590 store.close(); 591 } catch (Exception e) { 592 LOG.error("Error closing store.", e); 593 } 594 } 595 596 } 597 createPolicyMonitors()598 protected void createPolicyMonitors() { 599 if (scheduler instanceof PreemptableResourceScheduler 600 && conf.getBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, 601 YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_MONITORS)) { 602 LOG.info("Loading policy monitors"); 603 List<SchedulingEditPolicy> policies = conf.getInstances( 604 YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES, 605 SchedulingEditPolicy.class); 606 if (policies.size() > 0) { 607 for (SchedulingEditPolicy policy : policies) { 608 LOG.info("LOADING SchedulingEditPolicy:" + policy.getPolicyName()); 609 // periodically check whether we need to take action to guarantee 610 // constraints 611 SchedulingMonitor mon = new SchedulingMonitor(rmContext, policy); 612 addService(mon); 613 } 614 } else { 615 LOG.warn("Policy monitors configured (" + 616 YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS + 617 ") but none specified (" + 618 YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES + ")"); 619 } 620 } 621 } 622 } 623 624 @Private 625 public static class SchedulerEventDispatcher extends AbstractService 626 implements EventHandler<SchedulerEvent> { 627 628 private final ResourceScheduler scheduler; 629 private final BlockingQueue<SchedulerEvent> eventQueue = 630 new LinkedBlockingQueue<SchedulerEvent>(); 631 private final Thread eventProcessor; 632 private volatile boolean stopped = false; 633 private boolean shouldExitOnError = false; 634 SchedulerEventDispatcher(ResourceScheduler scheduler)635 public SchedulerEventDispatcher(ResourceScheduler scheduler) { 636 super(SchedulerEventDispatcher.class.getName()); 637 this.scheduler = scheduler; 638 this.eventProcessor = new Thread(new EventProcessor()); 639 this.eventProcessor.setName("ResourceManager Event Processor"); 640 } 641 642 @Override serviceInit(Configuration conf)643 protected void serviceInit(Configuration conf) throws Exception { 644 this.shouldExitOnError = 645 conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, 646 Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR); 647 super.serviceInit(conf); 648 } 649 650 @Override serviceStart()651 protected void serviceStart() throws Exception { 652 this.eventProcessor.start(); 653 super.serviceStart(); 654 } 655 656 private final class EventProcessor implements Runnable { 657 @Override run()658 public void run() { 659 660 SchedulerEvent event; 661 662 while (!stopped && !Thread.currentThread().isInterrupted()) { 663 try { 664 event = eventQueue.take(); 665 } catch (InterruptedException e) { 666 LOG.error("Returning, interrupted : " + e); 667 return; // TODO: Kill RM. 668 } 669 670 try { 671 scheduler.handle(event); 672 } catch (Throwable t) { 673 // An error occurred, but we are shutting down anyway. 674 // If it was an InterruptedException, the very act of 675 // shutdown could have caused it and is probably harmless. 676 if (stopped) { 677 LOG.warn("Exception during shutdown: ", t); 678 break; 679 } 680 LOG.fatal("Error in handling event type " + event.getType() 681 + " to the scheduler", t); 682 if (shouldExitOnError 683 && !ShutdownHookManager.get().isShutdownInProgress()) { 684 LOG.info("Exiting, bbye.."); 685 System.exit(-1); 686 } 687 } 688 } 689 } 690 } 691 692 @Override serviceStop()693 protected void serviceStop() throws Exception { 694 this.stopped = true; 695 this.eventProcessor.interrupt(); 696 try { 697 this.eventProcessor.join(); 698 } catch (InterruptedException e) { 699 throw new YarnRuntimeException(e); 700 } 701 super.serviceStop(); 702 } 703 704 @Override handle(SchedulerEvent event)705 public void handle(SchedulerEvent event) { 706 try { 707 int qSize = eventQueue.size(); 708 if (qSize !=0 && qSize %1000 == 0) { 709 LOG.info("Size of scheduler event-queue is " + qSize); 710 } 711 int remCapacity = eventQueue.remainingCapacity(); 712 if (remCapacity < 1000) { 713 LOG.info("Very low remaining capacity on scheduler event queue: " 714 + remCapacity); 715 } 716 this.eventQueue.put(event); 717 } catch (InterruptedException e) { 718 LOG.info("Interrupted. Trying to exit gracefully."); 719 } 720 } 721 } 722 723 @Private 724 public static class RMFatalEventDispatcher 725 implements EventHandler<RMFatalEvent> { 726 727 @Override handle(RMFatalEvent event)728 public void handle(RMFatalEvent event) { 729 LOG.fatal("Received a " + RMFatalEvent.class.getName() + " of type " + 730 event.getType().name() + ". Cause:\n" + event.getCause()); 731 732 ExitUtil.terminate(1, event.getCause()); 733 } 734 } 735 handleTransitionToStandBy()736 public void handleTransitionToStandBy() { 737 if (rmContext.isHAEnabled()) { 738 try { 739 // Transition to standby and reinit active services 740 LOG.info("Transitioning RM to Standby mode"); 741 transitionToStandby(true); 742 adminService.resetLeaderElection(); 743 return; 744 } catch (Exception e) { 745 LOG.fatal("Failed to transition RM to Standby mode."); 746 ExitUtil.terminate(1, e); 747 } 748 } 749 } 750 751 @Private 752 public static final class ApplicationEventDispatcher implements 753 EventHandler<RMAppEvent> { 754 755 private final RMContext rmContext; 756 ApplicationEventDispatcher(RMContext rmContext)757 public ApplicationEventDispatcher(RMContext rmContext) { 758 this.rmContext = rmContext; 759 } 760 761 @Override handle(RMAppEvent event)762 public void handle(RMAppEvent event) { 763 ApplicationId appID = event.getApplicationId(); 764 RMApp rmApp = this.rmContext.getRMApps().get(appID); 765 if (rmApp != null) { 766 try { 767 rmApp.handle(event); 768 } catch (Throwable t) { 769 LOG.error("Error in handling event type " + event.getType() 770 + " for application " + appID, t); 771 } 772 } 773 } 774 } 775 776 @Private 777 public static final class ApplicationAttemptEventDispatcher implements 778 EventHandler<RMAppAttemptEvent> { 779 780 private final RMContext rmContext; 781 ApplicationAttemptEventDispatcher(RMContext rmContext)782 public ApplicationAttemptEventDispatcher(RMContext rmContext) { 783 this.rmContext = rmContext; 784 } 785 786 @Override handle(RMAppAttemptEvent event)787 public void handle(RMAppAttemptEvent event) { 788 ApplicationAttemptId appAttemptID = event.getApplicationAttemptId(); 789 ApplicationId appAttemptId = appAttemptID.getApplicationId(); 790 RMApp rmApp = this.rmContext.getRMApps().get(appAttemptId); 791 if (rmApp != null) { 792 RMAppAttempt rmAppAttempt = rmApp.getRMAppAttempt(appAttemptID); 793 if (rmAppAttempt != null) { 794 try { 795 rmAppAttempt.handle(event); 796 } catch (Throwable t) { 797 LOG.error("Error in handling event type " + event.getType() 798 + " for applicationAttempt " + appAttemptId, t); 799 } 800 } 801 } 802 } 803 } 804 805 @Private 806 public static final class NodeEventDispatcher implements 807 EventHandler<RMNodeEvent> { 808 809 private final RMContext rmContext; 810 NodeEventDispatcher(RMContext rmContext)811 public NodeEventDispatcher(RMContext rmContext) { 812 this.rmContext = rmContext; 813 } 814 815 @Override handle(RMNodeEvent event)816 public void handle(RMNodeEvent event) { 817 NodeId nodeId = event.getNodeId(); 818 RMNode node = this.rmContext.getRMNodes().get(nodeId); 819 if (node != null) { 820 try { 821 ((EventHandler<RMNodeEvent>) node).handle(event); 822 } catch (Throwable t) { 823 LOG.error("Error in handling event type " + event.getType() 824 + " for node " + nodeId, t); 825 } 826 } 827 } 828 } 829 startWepApp()830 protected void startWepApp() { 831 832 // Use the customized yarn filter instead of the standard kerberos filter to 833 // allow users to authenticate using delegation tokens 834 // 4 conditions need to be satisfied - 835 // 1. security is enabled 836 // 2. http auth type is set to kerberos 837 // 3. "yarn.resourcemanager.webapp.use-yarn-filter" override is set to true 838 // 4. hadoop.http.filter.initializers container AuthenticationFilterInitializer 839 840 Configuration conf = getConfig(); 841 boolean enableCorsFilter = 842 conf.getBoolean(YarnConfiguration.RM_WEBAPP_ENABLE_CORS_FILTER, 843 YarnConfiguration.DEFAULT_RM_WEBAPP_ENABLE_CORS_FILTER); 844 boolean useYarnAuthenticationFilter = 845 conf.getBoolean( 846 YarnConfiguration.RM_WEBAPP_DELEGATION_TOKEN_AUTH_FILTER, 847 YarnConfiguration.DEFAULT_RM_WEBAPP_DELEGATION_TOKEN_AUTH_FILTER); 848 String authPrefix = "hadoop.http.authentication."; 849 String authTypeKey = authPrefix + "type"; 850 String filterInitializerConfKey = "hadoop.http.filter.initializers"; 851 String actualInitializers = ""; 852 Class<?>[] initializersClasses = 853 conf.getClasses(filterInitializerConfKey); 854 855 // setup CORS 856 if (enableCorsFilter) { 857 conf.setBoolean(HttpCrossOriginFilterInitializer.PREFIX 858 + HttpCrossOriginFilterInitializer.ENABLED_SUFFIX, true); 859 } 860 861 boolean hasHadoopAuthFilterInitializer = false; 862 boolean hasRMAuthFilterInitializer = false; 863 if (initializersClasses != null) { 864 for (Class<?> initializer : initializersClasses) { 865 if (initializer.getName().equals( 866 AuthenticationFilterInitializer.class.getName())) { 867 hasHadoopAuthFilterInitializer = true; 868 } 869 if (initializer.getName().equals( 870 RMAuthenticationFilterInitializer.class.getName())) { 871 hasRMAuthFilterInitializer = true; 872 } 873 } 874 if (UserGroupInformation.isSecurityEnabled() 875 && useYarnAuthenticationFilter 876 && hasHadoopAuthFilterInitializer 877 && conf.get(authTypeKey, "").equals( 878 KerberosAuthenticationHandler.TYPE)) { 879 ArrayList<String> target = new ArrayList<String>(); 880 for (Class<?> filterInitializer : initializersClasses) { 881 if (filterInitializer.getName().equals( 882 AuthenticationFilterInitializer.class.getName())) { 883 if (hasRMAuthFilterInitializer == false) { 884 target.add(RMAuthenticationFilterInitializer.class.getName()); 885 } 886 continue; 887 } 888 target.add(filterInitializer.getName()); 889 } 890 actualInitializers = StringUtils.join(",", target); 891 892 LOG.info("Using RM authentication filter(kerberos/delegation-token)" 893 + " for RM webapp authentication"); 894 RMAuthenticationFilter 895 .setDelegationTokenSecretManager(getClientRMService().rmDTSecretManager); 896 conf.set(filterInitializerConfKey, actualInitializers); 897 } 898 } 899 900 // if security is not enabled and the default filter initializer has not 901 // been set, set the initializer to include the 902 // RMAuthenticationFilterInitializer which in turn will set up the simple 903 // auth filter. 904 905 String initializers = conf.get(filterInitializerConfKey); 906 if (!UserGroupInformation.isSecurityEnabled()) { 907 if (initializersClasses == null || initializersClasses.length == 0) { 908 conf.set(filterInitializerConfKey, 909 RMAuthenticationFilterInitializer.class.getName()); 910 conf.set(authTypeKey, "simple"); 911 } else if (initializers.equals(StaticUserWebFilter.class.getName())) { 912 conf.set(filterInitializerConfKey, 913 RMAuthenticationFilterInitializer.class.getName() + "," 914 + initializers); 915 conf.set(authTypeKey, "simple"); 916 } 917 } 918 919 Builder<ApplicationMasterService> builder = 920 WebApps 921 .$for("cluster", ApplicationMasterService.class, masterService, 922 "ws") 923 .with(conf) 924 .withHttpSpnegoPrincipalKey( 925 YarnConfiguration.RM_WEBAPP_SPNEGO_USER_NAME_KEY) 926 .withHttpSpnegoKeytabKey( 927 YarnConfiguration.RM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY) 928 .at(webAppAddress); 929 String proxyHostAndPort = WebAppUtils.getProxyHostAndPort(conf); 930 if(WebAppUtils.getResolvedRMWebAppURLWithoutScheme(conf). 931 equals(proxyHostAndPort)) { 932 if (HAUtil.isHAEnabled(conf)) { 933 fetcher = new AppReportFetcher(conf); 934 } else { 935 fetcher = new AppReportFetcher(conf, getClientRMService()); 936 } 937 builder.withServlet(ProxyUriUtils.PROXY_SERVLET_NAME, 938 ProxyUriUtils.PROXY_PATH_SPEC, WebAppProxyServlet.class); 939 builder.withAttribute(WebAppProxy.FETCHER_ATTRIBUTE, fetcher); 940 String[] proxyParts = proxyHostAndPort.split(":"); 941 builder.withAttribute(WebAppProxy.PROXY_HOST_ATTRIBUTE, proxyParts[0]); 942 943 } 944 webApp = builder.start(new RMWebApp(this)); 945 } 946 947 /** 948 * Helper method to create and init {@link #activeServices}. This creates an 949 * instance of {@link RMActiveServices} and initializes it. 950 * @throws Exception 951 */ createAndInitActiveServices()952 protected void createAndInitActiveServices() throws Exception { 953 activeServices = new RMActiveServices(this); 954 activeServices.init(conf); 955 } 956 957 /** 958 * Helper method to start {@link #activeServices}. 959 * @throws Exception 960 */ startActiveServices()961 void startActiveServices() throws Exception { 962 if (activeServices != null) { 963 clusterTimeStamp = System.currentTimeMillis(); 964 activeServices.start(); 965 } 966 } 967 968 /** 969 * Helper method to stop {@link #activeServices}. 970 * @throws Exception 971 */ stopActiveServices()972 void stopActiveServices() throws Exception { 973 if (activeServices != null) { 974 activeServices.stop(); 975 activeServices = null; 976 } 977 } 978 reinitialize(boolean initialize)979 void reinitialize(boolean initialize) throws Exception { 980 ClusterMetrics.destroy(); 981 QueueMetrics.clearQueueMetrics(); 982 if (initialize) { 983 resetDispatcher(); 984 createAndInitActiveServices(); 985 } 986 } 987 988 @VisibleForTesting areActiveServicesRunning()989 protected boolean areActiveServicesRunning() { 990 return activeServices != null && activeServices.isInState(STATE.STARTED); 991 } 992 transitionToActive()993 synchronized void transitionToActive() throws Exception { 994 if (rmContext.getHAServiceState() == HAServiceProtocol.HAServiceState.ACTIVE) { 995 LOG.info("Already in active state"); 996 return; 997 } 998 999 LOG.info("Transitioning to active state"); 1000 1001 this.rmLoginUGI.doAs(new PrivilegedExceptionAction<Void>() { 1002 @Override 1003 public Void run() throws Exception { 1004 try { 1005 startActiveServices(); 1006 return null; 1007 } catch (Exception e) { 1008 reinitialize(true); 1009 throw e; 1010 } 1011 } 1012 }); 1013 1014 rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.ACTIVE); 1015 LOG.info("Transitioned to active state"); 1016 } 1017 transitionToStandby(boolean initialize)1018 synchronized void transitionToStandby(boolean initialize) 1019 throws Exception { 1020 if (rmContext.getHAServiceState() == 1021 HAServiceProtocol.HAServiceState.STANDBY) { 1022 LOG.info("Already in standby state"); 1023 return; 1024 } 1025 1026 LOG.info("Transitioning to standby state"); 1027 HAServiceState state = rmContext.getHAServiceState(); 1028 rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.STANDBY); 1029 if (state == HAServiceProtocol.HAServiceState.ACTIVE) { 1030 stopActiveServices(); 1031 reinitialize(initialize); 1032 } 1033 LOG.info("Transitioned to standby state"); 1034 } 1035 1036 @Override serviceStart()1037 protected void serviceStart() throws Exception { 1038 if (this.rmContext.isHAEnabled()) { 1039 transitionToStandby(true); 1040 } else { 1041 transitionToActive(); 1042 } 1043 1044 startWepApp(); 1045 if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, 1046 false)) { 1047 int port = webApp.port(); 1048 WebAppUtils.setRMWebAppPort(conf, port); 1049 } 1050 super.serviceStart(); 1051 } 1052 doSecureLogin()1053 protected void doSecureLogin() throws IOException { 1054 InetSocketAddress socAddr = getBindAddress(conf); 1055 SecurityUtil.login(this.conf, YarnConfiguration.RM_KEYTAB, 1056 YarnConfiguration.RM_PRINCIPAL, socAddr.getHostName()); 1057 1058 // if security is enable, set rmLoginUGI as UGI of loginUser 1059 if (UserGroupInformation.isSecurityEnabled()) { 1060 this.rmLoginUGI = UserGroupInformation.getLoginUser(); 1061 } 1062 } 1063 1064 @Override serviceStop()1065 protected void serviceStop() throws Exception { 1066 if (webApp != null) { 1067 webApp.stop(); 1068 } 1069 if (fetcher != null) { 1070 fetcher.stop(); 1071 } 1072 if (configurationProvider != null) { 1073 configurationProvider.close(); 1074 } 1075 super.serviceStop(); 1076 transitionToStandby(false); 1077 rmContext.setHAServiceState(HAServiceState.STOPPING); 1078 } 1079 createResourceTrackerService()1080 protected ResourceTrackerService createResourceTrackerService() { 1081 return new ResourceTrackerService(this.rmContext, this.nodesListManager, 1082 this.nmLivelinessMonitor, 1083 this.rmContext.getContainerTokenSecretManager(), 1084 this.rmContext.getNMTokenSecretManager()); 1085 } 1086 createClientRMService()1087 protected ClientRMService createClientRMService() { 1088 return new ClientRMService(this.rmContext, scheduler, this.rmAppManager, 1089 this.applicationACLsManager, this.queueACLsManager, 1090 this.rmContext.getRMDelegationTokenSecretManager()); 1091 } 1092 createApplicationMasterService()1093 protected ApplicationMasterService createApplicationMasterService() { 1094 return new ApplicationMasterService(this.rmContext, scheduler); 1095 } 1096 createAdminService()1097 protected AdminService createAdminService() { 1098 return new AdminService(this, rmContext); 1099 } 1100 createRMSecretManagerService()1101 protected RMSecretManagerService createRMSecretManagerService() { 1102 return new RMSecretManagerService(conf, rmContext); 1103 } 1104 1105 @Private getClientRMService()1106 public ClientRMService getClientRMService() { 1107 return this.clientRM; 1108 } 1109 1110 /** 1111 * return the scheduler. 1112 * @return the scheduler for the Resource Manager. 1113 */ 1114 @Private getResourceScheduler()1115 public ResourceScheduler getResourceScheduler() { 1116 return this.scheduler; 1117 } 1118 1119 /** 1120 * return the resource tracking component. 1121 * @return the resource tracking component. 1122 */ 1123 @Private getResourceTrackerService()1124 public ResourceTrackerService getResourceTrackerService() { 1125 return this.resourceTracker; 1126 } 1127 1128 @Private getApplicationMasterService()1129 public ApplicationMasterService getApplicationMasterService() { 1130 return this.masterService; 1131 } 1132 1133 @Private getApplicationACLsManager()1134 public ApplicationACLsManager getApplicationACLsManager() { 1135 return this.applicationACLsManager; 1136 } 1137 1138 @Private getQueueACLsManager()1139 public QueueACLsManager getQueueACLsManager() { 1140 return this.queueACLsManager; 1141 } 1142 1143 @Private getWebapp()1144 WebApp getWebapp() { 1145 return this.webApp; 1146 } 1147 1148 @Override recover(RMState state)1149 public void recover(RMState state) throws Exception { 1150 // recover RMdelegationTokenSecretManager 1151 rmContext.getRMDelegationTokenSecretManager().recover(state); 1152 1153 // recover AMRMTokenSecretManager 1154 rmContext.getAMRMTokenSecretManager().recover(state); 1155 1156 // recover applications 1157 rmAppManager.recover(state); 1158 1159 setSchedulerRecoveryStartAndWaitTime(state, conf); 1160 } 1161 main(String argv[])1162 public static void main(String argv[]) { 1163 Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); 1164 StringUtils.startupShutdownMessage(ResourceManager.class, argv, LOG); 1165 try { 1166 Configuration conf = new YarnConfiguration(); 1167 GenericOptionsParser hParser = new GenericOptionsParser(conf, argv); 1168 argv = hParser.getRemainingArgs(); 1169 // If -format-state-store, then delete RMStateStore; else startup normally 1170 if (argv.length == 1 && argv[0].equals("-format-state-store")) { 1171 deleteRMStateStore(conf); 1172 } else { 1173 ResourceManager resourceManager = new ResourceManager(); 1174 ShutdownHookManager.get().addShutdownHook( 1175 new CompositeServiceShutdownHook(resourceManager), 1176 SHUTDOWN_HOOK_PRIORITY); 1177 resourceManager.init(conf); 1178 resourceManager.start(); 1179 } 1180 } catch (Throwable t) { 1181 LOG.fatal("Error starting ResourceManager", t); 1182 System.exit(-1); 1183 } 1184 } 1185 1186 /** 1187 * Register the handlers for alwaysOn services 1188 */ setupDispatcher()1189 private Dispatcher setupDispatcher() { 1190 Dispatcher dispatcher = createDispatcher(); 1191 dispatcher.register(RMFatalEventType.class, 1192 new ResourceManager.RMFatalEventDispatcher()); 1193 return dispatcher; 1194 } 1195 resetDispatcher()1196 private void resetDispatcher() { 1197 Dispatcher dispatcher = setupDispatcher(); 1198 ((Service)dispatcher).init(this.conf); 1199 ((Service)dispatcher).start(); 1200 removeService((Service)rmDispatcher); 1201 // Need to stop previous rmDispatcher before assigning new dispatcher 1202 // otherwise causes "AsyncDispatcher event handler" thread leak 1203 ((Service) rmDispatcher).stop(); 1204 rmDispatcher = dispatcher; 1205 addIfService(rmDispatcher); 1206 rmContext.setDispatcher(rmDispatcher); 1207 } 1208 setSchedulerRecoveryStartAndWaitTime(RMState state, Configuration conf)1209 private void setSchedulerRecoveryStartAndWaitTime(RMState state, 1210 Configuration conf) { 1211 if (!state.getApplicationState().isEmpty()) { 1212 long waitTime = 1213 conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, 1214 YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS); 1215 rmContext.setSchedulerRecoveryStartAndWaitTime(waitTime); 1216 } 1217 } 1218 1219 /** 1220 * Retrieve RM bind address from configuration 1221 * 1222 * @param conf 1223 * @return InetSocketAddress 1224 */ getBindAddress(Configuration conf)1225 public static InetSocketAddress getBindAddress(Configuration conf) { 1226 return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS, 1227 YarnConfiguration.DEFAULT_RM_ADDRESS, YarnConfiguration.DEFAULT_RM_PORT); 1228 } 1229 1230 /** 1231 * Deletes the RMStateStore 1232 * 1233 * @param conf 1234 * @throws Exception 1235 */ deleteRMStateStore(Configuration conf)1236 private static void deleteRMStateStore(Configuration conf) throws Exception { 1237 RMStateStore rmStore = RMStateStoreFactory.getStore(conf); 1238 rmStore.init(conf); 1239 rmStore.start(); 1240 try { 1241 LOG.info("Deleting ResourceManager state store..."); 1242 rmStore.deleteStore(); 1243 LOG.info("State store deleted"); 1244 } finally { 1245 rmStore.stop(); 1246 } 1247 } 1248 } 1249