1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.yarn.server.resourcemanager;
20 
21 import com.google.common.annotations.VisibleForTesting;
22 import org.apache.commons.logging.Log;
23 import org.apache.commons.logging.LogFactory;
24 import org.apache.hadoop.classification.InterfaceAudience.Private;
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.ha.HAServiceProtocol;
27 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
28 import org.apache.hadoop.http.lib.StaticUserWebFilter;
29 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
30 import org.apache.hadoop.metrics2.source.JvmMetrics;
31 import org.apache.hadoop.security.*;
32 import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
33 import org.apache.hadoop.security.authorize.ProxyUsers;
34 import org.apache.hadoop.service.AbstractService;
35 import org.apache.hadoop.service.CompositeService;
36 import org.apache.hadoop.service.Service;
37 import org.apache.hadoop.util.ExitUtil;
38 import org.apache.hadoop.util.GenericOptionsParser;
39 import org.apache.hadoop.util.ReflectionUtils;
40 import org.apache.hadoop.util.ShutdownHookManager;
41 import org.apache.hadoop.util.StringUtils;
42 import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
43 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
44 import org.apache.hadoop.yarn.api.records.ApplicationId;
45 import org.apache.hadoop.yarn.api.records.NodeId;
46 import org.apache.hadoop.yarn.conf.ConfigurationProvider;
47 import org.apache.hadoop.yarn.conf.ConfigurationProviderFactory;
48 import org.apache.hadoop.yarn.conf.HAUtil;
49 import org.apache.hadoop.yarn.conf.YarnConfiguration;
50 import org.apache.hadoop.yarn.event.AsyncDispatcher;
51 import org.apache.hadoop.yarn.event.Dispatcher;
52 import org.apache.hadoop.yarn.event.EventHandler;
53 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
54 import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
55 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
56 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
57 import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
58 import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
59 import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
60 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
61 import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
62 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
63 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
64 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreFactory;
65 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
66 import org.apache.hadoop.yarn.server.resourcemanager.reservation.AbstractReservationSystem;
67 import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
68 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
69 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
70 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
71 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
72 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
73 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
74 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
75 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
76 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
77 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
78 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
79 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
80 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
81 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
82 import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
83 import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
84 import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp;
85 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
86 import org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilter;
87 import org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilterInitializer;
88 import org.apache.hadoop.yarn.server.webproxy.AppReportFetcher;
89 import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
90 import org.apache.hadoop.yarn.server.webproxy.WebAppProxy;
91 import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet;
92 import org.apache.hadoop.yarn.webapp.WebApp;
93 import org.apache.hadoop.yarn.webapp.WebApps;
94 import org.apache.hadoop.yarn.webapp.WebApps.Builder;
95 import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
96 
97 import java.io.IOException;
98 import java.io.InputStream;
99 import java.net.InetSocketAddress;
100 import java.security.PrivilegedExceptionAction;
101 import java.util.ArrayList;
102 import java.util.List;
103 import java.util.concurrent.BlockingQueue;
104 import java.util.concurrent.LinkedBlockingQueue;
105 
106 /**
107  * The ResourceManager is the main class that is a set of components.
108  * "I am the ResourceManager. All your resources belong to us..."
109  *
110  */
111 @SuppressWarnings("unchecked")
112 public class ResourceManager extends CompositeService implements Recoverable {
113 
114   /**
115    * Priority of the ResourceManager shutdown hook.
116    */
117   public static final int SHUTDOWN_HOOK_PRIORITY = 30;
118 
119   private static final Log LOG = LogFactory.getLog(ResourceManager.class);
120   private static long clusterTimeStamp = System.currentTimeMillis();
121 
122   /**
123    * "Always On" services. Services that need to run always irrespective of
124    * the HA state of the RM.
125    */
126   @VisibleForTesting
127   protected RMContextImpl rmContext;
128   private Dispatcher rmDispatcher;
129   @VisibleForTesting
130   protected AdminService adminService;
131 
132   /**
133    * "Active" services. Services that need to run only on the Active RM.
134    * These services are managed (initialized, started, stopped) by the
135    * {@link CompositeService} RMActiveServices.
136    *
137    * RM is active when (1) HA is disabled, or (2) HA is enabled and the RM is
138    * in Active state.
139    */
140   protected RMActiveServices activeServices;
141   protected RMSecretManagerService rmSecretManagerService;
142 
143   protected ResourceScheduler scheduler;
144   protected ReservationSystem reservationSystem;
145   private ClientRMService clientRM;
146   protected ApplicationMasterService masterService;
147   protected NMLivelinessMonitor nmLivelinessMonitor;
148   protected NodesListManager nodesListManager;
149   protected RMAppManager rmAppManager;
150   protected ApplicationACLsManager applicationACLsManager;
151   protected QueueACLsManager queueACLsManager;
152   private WebApp webApp;
153   private AppReportFetcher fetcher = null;
154   protected ResourceTrackerService resourceTracker;
155 
156   @VisibleForTesting
157   protected String webAppAddress;
158   private ConfigurationProvider configurationProvider = null;
159   /** End of Active services */
160 
161   private Configuration conf;
162 
163   private UserGroupInformation rmLoginUGI;
164 
ResourceManager()165   public ResourceManager() {
166     super("ResourceManager");
167   }
168 
getRMContext()169   public RMContext getRMContext() {
170     return this.rmContext;
171   }
172 
getClusterTimeStamp()173   public static long getClusterTimeStamp() {
174     return clusterTimeStamp;
175   }
176 
177   @VisibleForTesting
setClusterTimeStamp(long timestamp)178   protected static void setClusterTimeStamp(long timestamp) {
179     clusterTimeStamp = timestamp;
180   }
181 
182   @Override
serviceInit(Configuration conf)183   protected void serviceInit(Configuration conf) throws Exception {
184     this.conf = conf;
185     this.rmContext = new RMContextImpl();
186 
187     this.configurationProvider =
188         ConfigurationProviderFactory.getConfigurationProvider(conf);
189     this.configurationProvider.init(this.conf);
190     rmContext.setConfigurationProvider(configurationProvider);
191 
192     // load core-site.xml
193     InputStream coreSiteXMLInputStream =
194         this.configurationProvider.getConfigurationInputStream(this.conf,
195             YarnConfiguration.CORE_SITE_CONFIGURATION_FILE);
196     if (coreSiteXMLInputStream != null) {
197       this.conf.addResource(coreSiteXMLInputStream);
198     }
199 
200     // Do refreshUserToGroupsMappings with loaded core-site.xml
201     Groups.getUserToGroupsMappingServiceWithLoadedConfiguration(this.conf)
202         .refresh();
203 
204     // Do refreshSuperUserGroupsConfiguration with loaded core-site.xml
205     // Or use RM specific configurations to overwrite the common ones first
206     // if they exist
207     RMServerUtils.processRMProxyUsersConf(conf);
208     ProxyUsers.refreshSuperUserGroupsConfiguration(this.conf);
209 
210     // load yarn-site.xml
211     InputStream yarnSiteXMLInputStream =
212         this.configurationProvider.getConfigurationInputStream(this.conf,
213             YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
214     if (yarnSiteXMLInputStream != null) {
215       this.conf.addResource(yarnSiteXMLInputStream);
216     }
217 
218     validateConfigs(this.conf);
219 
220     // Set HA configuration should be done before login
221     this.rmContext.setHAEnabled(HAUtil.isHAEnabled(this.conf));
222     if (this.rmContext.isHAEnabled()) {
223       HAUtil.verifyAndSetConfiguration(this.conf);
224     }
225 
226     // Set UGI and do login
227     // If security is enabled, use login user
228     // If security is not enabled, use current user
229     this.rmLoginUGI = UserGroupInformation.getCurrentUser();
230     try {
231       doSecureLogin();
232     } catch(IOException ie) {
233       throw new YarnRuntimeException("Failed to login", ie);
234     }
235 
236     // register the handlers for all AlwaysOn services using setupDispatcher().
237     rmDispatcher = setupDispatcher();
238     addIfService(rmDispatcher);
239     rmContext.setDispatcher(rmDispatcher);
240 
241     adminService = createAdminService();
242     addService(adminService);
243     rmContext.setRMAdminService(adminService);
244 
245     rmContext.setYarnConfiguration(conf);
246 
247     createAndInitActiveServices();
248 
249     webAppAddress = WebAppUtils.getWebAppBindURL(this.conf,
250                       YarnConfiguration.RM_BIND_HOST,
251                       WebAppUtils.getRMWebAppURLWithoutScheme(this.conf));
252 
253     RMApplicationHistoryWriter rmApplicationHistoryWriter =
254         createRMApplicationHistoryWriter();
255     addService(rmApplicationHistoryWriter);
256     rmContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
257 
258     SystemMetricsPublisher systemMetricsPublisher = createSystemMetricsPublisher();
259     addService(systemMetricsPublisher);
260     rmContext.setSystemMetricsPublisher(systemMetricsPublisher);
261 
262     super.serviceInit(this.conf);
263   }
264 
createQueueACLsManager(ResourceScheduler scheduler, Configuration conf)265   protected QueueACLsManager createQueueACLsManager(ResourceScheduler scheduler,
266       Configuration conf) {
267     return new QueueACLsManager(scheduler, conf);
268   }
269 
270   @VisibleForTesting
setRMStateStore(RMStateStore rmStore)271   protected void setRMStateStore(RMStateStore rmStore) {
272     rmStore.setRMDispatcher(rmDispatcher);
273     rmStore.setResourceManager(this);
274     rmContext.setStateStore(rmStore);
275   }
276 
createSchedulerEventDispatcher()277   protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
278     return new SchedulerEventDispatcher(this.scheduler);
279   }
280 
createDispatcher()281   protected Dispatcher createDispatcher() {
282     return new AsyncDispatcher();
283   }
284 
createScheduler()285   protected ResourceScheduler createScheduler() {
286     String schedulerClassName = conf.get(YarnConfiguration.RM_SCHEDULER,
287         YarnConfiguration.DEFAULT_RM_SCHEDULER);
288     LOG.info("Using Scheduler: " + schedulerClassName);
289     try {
290       Class<?> schedulerClazz = Class.forName(schedulerClassName);
291       if (ResourceScheduler.class.isAssignableFrom(schedulerClazz)) {
292         return (ResourceScheduler) ReflectionUtils.newInstance(schedulerClazz,
293             this.conf);
294       } else {
295         throw new YarnRuntimeException("Class: " + schedulerClassName
296             + " not instance of " + ResourceScheduler.class.getCanonicalName());
297       }
298     } catch (ClassNotFoundException e) {
299       throw new YarnRuntimeException("Could not instantiate Scheduler: "
300           + schedulerClassName, e);
301     }
302   }
303 
createReservationSystem()304   protected ReservationSystem createReservationSystem() {
305     String reservationClassName =
306         conf.get(YarnConfiguration.RM_RESERVATION_SYSTEM_CLASS,
307             AbstractReservationSystem.getDefaultReservationSystem(scheduler));
308     if (reservationClassName == null) {
309       return null;
310     }
311     LOG.info("Using ReservationSystem: " + reservationClassName);
312     try {
313       Class<?> reservationClazz = Class.forName(reservationClassName);
314       if (ReservationSystem.class.isAssignableFrom(reservationClazz)) {
315         return (ReservationSystem) ReflectionUtils.newInstance(
316             reservationClazz, this.conf);
317       } else {
318         throw new YarnRuntimeException("Class: " + reservationClassName
319             + " not instance of " + ReservationSystem.class.getCanonicalName());
320       }
321     } catch (ClassNotFoundException e) {
322       throw new YarnRuntimeException(
323           "Could not instantiate ReservationSystem: " + reservationClassName, e);
324     }
325   }
326 
createAMLauncher()327   protected ApplicationMasterLauncher createAMLauncher() {
328     return new ApplicationMasterLauncher(this.rmContext);
329   }
330 
createNMLivelinessMonitor()331   private NMLivelinessMonitor createNMLivelinessMonitor() {
332     return new NMLivelinessMonitor(this.rmContext
333         .getDispatcher());
334   }
335 
createAMLivelinessMonitor()336   protected AMLivelinessMonitor createAMLivelinessMonitor() {
337     return new AMLivelinessMonitor(this.rmDispatcher);
338   }
339 
createNodeLabelManager()340   protected RMNodeLabelsManager createNodeLabelManager()
341       throws InstantiationException, IllegalAccessException {
342     return new RMNodeLabelsManager();
343   }
344 
createDelegationTokenRenewer()345   protected DelegationTokenRenewer createDelegationTokenRenewer() {
346     return new DelegationTokenRenewer();
347   }
348 
createRMAppManager()349   protected RMAppManager createRMAppManager() {
350     return new RMAppManager(this.rmContext, this.scheduler, this.masterService,
351       this.applicationACLsManager, this.conf);
352   }
353 
createRMApplicationHistoryWriter()354   protected RMApplicationHistoryWriter createRMApplicationHistoryWriter() {
355     return new RMApplicationHistoryWriter();
356   }
357 
createSystemMetricsPublisher()358   protected SystemMetricsPublisher createSystemMetricsPublisher() {
359     return new SystemMetricsPublisher();
360   }
361 
362   // sanity check for configurations
validateConfigs(Configuration conf)363   protected static void validateConfigs(Configuration conf) {
364     // validate max-attempts
365     int globalMaxAppAttempts =
366         conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
367         YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
368     if (globalMaxAppAttempts <= 0) {
369       throw new YarnRuntimeException("Invalid global max attempts configuration"
370           + ", " + YarnConfiguration.RM_AM_MAX_ATTEMPTS
371           + "=" + globalMaxAppAttempts + ", it should be a positive integer.");
372     }
373 
374     // validate expireIntvl >= heartbeatIntvl
375     long expireIntvl = conf.getLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
376         YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
377     long heartbeatIntvl =
378         conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS,
379             YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS);
380     if (expireIntvl < heartbeatIntvl) {
381       throw new YarnRuntimeException("Nodemanager expiry interval should be no"
382           + " less than heartbeat interval, "
383           + YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS + "=" + expireIntvl
384           + ", " + YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS + "="
385           + heartbeatIntvl);
386     }
387   }
388 
389   /**
390    * RMActiveServices handles all the Active services in the RM.
391    */
392   @Private
393   public class RMActiveServices extends CompositeService {
394 
395     private DelegationTokenRenewer delegationTokenRenewer;
396     private EventHandler<SchedulerEvent> schedulerDispatcher;
397     private ApplicationMasterLauncher applicationMasterLauncher;
398     private ContainerAllocationExpirer containerAllocationExpirer;
399     private ResourceManager rm;
400     private boolean recoveryEnabled;
401     private RMActiveServiceContext activeServiceContext;
402 
RMActiveServices(ResourceManager rm)403     RMActiveServices(ResourceManager rm) {
404       super("RMActiveServices");
405       this.rm = rm;
406     }
407 
408     @Override
serviceInit(Configuration configuration)409     protected void serviceInit(Configuration configuration) throws Exception {
410       activeServiceContext = new RMActiveServiceContext();
411       rmContext.setActiveServiceContext(activeServiceContext);
412 
413       conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
414       rmSecretManagerService = createRMSecretManagerService();
415       addService(rmSecretManagerService);
416 
417       containerAllocationExpirer = new ContainerAllocationExpirer(rmDispatcher);
418       addService(containerAllocationExpirer);
419       rmContext.setContainerAllocationExpirer(containerAllocationExpirer);
420 
421       AMLivelinessMonitor amLivelinessMonitor = createAMLivelinessMonitor();
422       addService(amLivelinessMonitor);
423       rmContext.setAMLivelinessMonitor(amLivelinessMonitor);
424 
425       AMLivelinessMonitor amFinishingMonitor = createAMLivelinessMonitor();
426       addService(amFinishingMonitor);
427       rmContext.setAMFinishingMonitor(amFinishingMonitor);
428 
429       RMNodeLabelsManager nlm = createNodeLabelManager();
430       nlm.setRMContext(rmContext);
431       addService(nlm);
432       rmContext.setNodeLabelManager(nlm);
433 
434       boolean isRecoveryEnabled = conf.getBoolean(
435           YarnConfiguration.RECOVERY_ENABLED,
436           YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED);
437 
438       RMStateStore rmStore = null;
439       if (isRecoveryEnabled) {
440         recoveryEnabled = true;
441         rmStore = RMStateStoreFactory.getStore(conf);
442         boolean isWorkPreservingRecoveryEnabled =
443             conf.getBoolean(
444               YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
445               YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED);
446         rmContext
447           .setWorkPreservingRecoveryEnabled(isWorkPreservingRecoveryEnabled);
448       } else {
449         recoveryEnabled = false;
450         rmStore = new NullRMStateStore();
451       }
452 
453       try {
454         rmStore.init(conf);
455         rmStore.setRMDispatcher(rmDispatcher);
456         rmStore.setResourceManager(rm);
457       } catch (Exception e) {
458         // the Exception from stateStore.init() needs to be handled for
459         // HA and we need to give up master status if we got fenced
460         LOG.error("Failed to init state store", e);
461         throw e;
462       }
463       rmContext.setStateStore(rmStore);
464 
465       if (UserGroupInformation.isSecurityEnabled()) {
466         delegationTokenRenewer = createDelegationTokenRenewer();
467         rmContext.setDelegationTokenRenewer(delegationTokenRenewer);
468       }
469 
470       // Register event handler for NodesListManager
471       nodesListManager = new NodesListManager(rmContext);
472       rmDispatcher.register(NodesListManagerEventType.class, nodesListManager);
473       addService(nodesListManager);
474       rmContext.setNodesListManager(nodesListManager);
475 
476       // Initialize the scheduler
477       scheduler = createScheduler();
478       scheduler.setRMContext(rmContext);
479       addIfService(scheduler);
480       rmContext.setScheduler(scheduler);
481 
482       schedulerDispatcher = createSchedulerEventDispatcher();
483       addIfService(schedulerDispatcher);
484       rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher);
485 
486       // Register event handler for RmAppEvents
487       rmDispatcher.register(RMAppEventType.class,
488           new ApplicationEventDispatcher(rmContext));
489 
490       // Register event handler for RmAppAttemptEvents
491       rmDispatcher.register(RMAppAttemptEventType.class,
492           new ApplicationAttemptEventDispatcher(rmContext));
493 
494       // Register event handler for RmNodes
495       rmDispatcher.register(
496           RMNodeEventType.class, new NodeEventDispatcher(rmContext));
497 
498       nmLivelinessMonitor = createNMLivelinessMonitor();
499       addService(nmLivelinessMonitor);
500 
501       resourceTracker = createResourceTrackerService();
502       addService(resourceTracker);
503       rmContext.setResourceTrackerService(resourceTracker);
504 
505       DefaultMetricsSystem.initialize("ResourceManager");
506       JvmMetrics.initSingleton("ResourceManager", null);
507 
508       // Initialize the Reservation system
509       if (conf.getBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE,
510           YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_ENABLE)) {
511         reservationSystem = createReservationSystem();
512         if (reservationSystem != null) {
513           reservationSystem.setRMContext(rmContext);
514           addIfService(reservationSystem);
515           rmContext.setReservationSystem(reservationSystem);
516           LOG.info("Initialized Reservation system");
517         }
518       }
519 
520       // creating monitors that handle preemption
521       createPolicyMonitors();
522 
523       masterService = createApplicationMasterService();
524       addService(masterService) ;
525       rmContext.setApplicationMasterService(masterService);
526 
527       applicationACLsManager = new ApplicationACLsManager(conf);
528 
529       queueACLsManager = createQueueACLsManager(scheduler, conf);
530 
531       rmAppManager = createRMAppManager();
532       // Register event handler for RMAppManagerEvents
533       rmDispatcher.register(RMAppManagerEventType.class, rmAppManager);
534 
535       clientRM = createClientRMService();
536       addService(clientRM);
537       rmContext.setClientRMService(clientRM);
538 
539       applicationMasterLauncher = createAMLauncher();
540       rmDispatcher.register(AMLauncherEventType.class,
541           applicationMasterLauncher);
542 
543       addService(applicationMasterLauncher);
544       if (UserGroupInformation.isSecurityEnabled()) {
545         addService(delegationTokenRenewer);
546         delegationTokenRenewer.setRMContext(rmContext);
547       }
548 
549       new RMNMInfo(rmContext, scheduler);
550 
551       super.serviceInit(conf);
552     }
553 
554     @Override
serviceStart()555     protected void serviceStart() throws Exception {
556       RMStateStore rmStore = rmContext.getStateStore();
557       // The state store needs to start irrespective of recoveryEnabled as apps
558       // need events to move to further states.
559       rmStore.start();
560 
561       if(recoveryEnabled) {
562         try {
563           LOG.info("Recovery started");
564           rmStore.checkVersion();
565           if (rmContext.isWorkPreservingRecoveryEnabled()) {
566             rmContext.setEpoch(rmStore.getAndIncrementEpoch());
567           }
568           RMState state = rmStore.loadState();
569           recover(state);
570           LOG.info("Recovery ended");
571         } catch (Exception e) {
572           // the Exception from loadState() needs to be handled for
573           // HA and we need to give up master status if we got fenced
574           LOG.error("Failed to load/recover state", e);
575           throw e;
576         }
577       }
578 
579       super.serviceStart();
580     }
581 
582     @Override
serviceStop()583     protected void serviceStop() throws Exception {
584 
585       super.serviceStop();
586       DefaultMetricsSystem.shutdown();
587       if (rmContext != null) {
588         RMStateStore store = rmContext.getStateStore();
589         try {
590           store.close();
591         } catch (Exception e) {
592           LOG.error("Error closing store.", e);
593         }
594       }
595 
596     }
597 
createPolicyMonitors()598     protected void createPolicyMonitors() {
599       if (scheduler instanceof PreemptableResourceScheduler
600           && conf.getBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS,
601           YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_MONITORS)) {
602         LOG.info("Loading policy monitors");
603         List<SchedulingEditPolicy> policies = conf.getInstances(
604             YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
605             SchedulingEditPolicy.class);
606         if (policies.size() > 0) {
607           for (SchedulingEditPolicy policy : policies) {
608             LOG.info("LOADING SchedulingEditPolicy:" + policy.getPolicyName());
609             // periodically check whether we need to take action to guarantee
610             // constraints
611             SchedulingMonitor mon = new SchedulingMonitor(rmContext, policy);
612             addService(mon);
613           }
614         } else {
615           LOG.warn("Policy monitors configured (" +
616               YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS +
617               ") but none specified (" +
618               YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES + ")");
619         }
620       }
621     }
622   }
623 
624   @Private
625   public static class SchedulerEventDispatcher extends AbstractService
626       implements EventHandler<SchedulerEvent> {
627 
628     private final ResourceScheduler scheduler;
629     private final BlockingQueue<SchedulerEvent> eventQueue =
630       new LinkedBlockingQueue<SchedulerEvent>();
631     private final Thread eventProcessor;
632     private volatile boolean stopped = false;
633     private boolean shouldExitOnError = false;
634 
SchedulerEventDispatcher(ResourceScheduler scheduler)635     public SchedulerEventDispatcher(ResourceScheduler scheduler) {
636       super(SchedulerEventDispatcher.class.getName());
637       this.scheduler = scheduler;
638       this.eventProcessor = new Thread(new EventProcessor());
639       this.eventProcessor.setName("ResourceManager Event Processor");
640     }
641 
642     @Override
serviceInit(Configuration conf)643     protected void serviceInit(Configuration conf) throws Exception {
644       this.shouldExitOnError =
645           conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
646             Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
647       super.serviceInit(conf);
648     }
649 
650     @Override
serviceStart()651     protected void serviceStart() throws Exception {
652       this.eventProcessor.start();
653       super.serviceStart();
654     }
655 
656     private final class EventProcessor implements Runnable {
657       @Override
run()658       public void run() {
659 
660         SchedulerEvent event;
661 
662         while (!stopped && !Thread.currentThread().isInterrupted()) {
663           try {
664             event = eventQueue.take();
665           } catch (InterruptedException e) {
666             LOG.error("Returning, interrupted : " + e);
667             return; // TODO: Kill RM.
668           }
669 
670           try {
671             scheduler.handle(event);
672           } catch (Throwable t) {
673             // An error occurred, but we are shutting down anyway.
674             // If it was an InterruptedException, the very act of
675             // shutdown could have caused it and is probably harmless.
676             if (stopped) {
677               LOG.warn("Exception during shutdown: ", t);
678               break;
679             }
680             LOG.fatal("Error in handling event type " + event.getType()
681                 + " to the scheduler", t);
682             if (shouldExitOnError
683                 && !ShutdownHookManager.get().isShutdownInProgress()) {
684               LOG.info("Exiting, bbye..");
685               System.exit(-1);
686             }
687           }
688         }
689       }
690     }
691 
692     @Override
serviceStop()693     protected void serviceStop() throws Exception {
694       this.stopped = true;
695       this.eventProcessor.interrupt();
696       try {
697         this.eventProcessor.join();
698       } catch (InterruptedException e) {
699         throw new YarnRuntimeException(e);
700       }
701       super.serviceStop();
702     }
703 
704     @Override
handle(SchedulerEvent event)705     public void handle(SchedulerEvent event) {
706       try {
707         int qSize = eventQueue.size();
708         if (qSize !=0 && qSize %1000 == 0) {
709           LOG.info("Size of scheduler event-queue is " + qSize);
710         }
711         int remCapacity = eventQueue.remainingCapacity();
712         if (remCapacity < 1000) {
713           LOG.info("Very low remaining capacity on scheduler event queue: "
714               + remCapacity);
715         }
716         this.eventQueue.put(event);
717       } catch (InterruptedException e) {
718         LOG.info("Interrupted. Trying to exit gracefully.");
719       }
720     }
721   }
722 
723   @Private
724   public static class RMFatalEventDispatcher
725       implements EventHandler<RMFatalEvent> {
726 
727     @Override
handle(RMFatalEvent event)728     public void handle(RMFatalEvent event) {
729       LOG.fatal("Received a " + RMFatalEvent.class.getName() + " of type " +
730           event.getType().name() + ". Cause:\n" + event.getCause());
731 
732       ExitUtil.terminate(1, event.getCause());
733     }
734   }
735 
handleTransitionToStandBy()736   public void handleTransitionToStandBy() {
737     if (rmContext.isHAEnabled()) {
738       try {
739         // Transition to standby and reinit active services
740         LOG.info("Transitioning RM to Standby mode");
741         transitionToStandby(true);
742         adminService.resetLeaderElection();
743         return;
744       } catch (Exception e) {
745         LOG.fatal("Failed to transition RM to Standby mode.");
746         ExitUtil.terminate(1, e);
747       }
748     }
749   }
750 
751   @Private
752   public static final class ApplicationEventDispatcher implements
753       EventHandler<RMAppEvent> {
754 
755     private final RMContext rmContext;
756 
ApplicationEventDispatcher(RMContext rmContext)757     public ApplicationEventDispatcher(RMContext rmContext) {
758       this.rmContext = rmContext;
759     }
760 
761     @Override
handle(RMAppEvent event)762     public void handle(RMAppEvent event) {
763       ApplicationId appID = event.getApplicationId();
764       RMApp rmApp = this.rmContext.getRMApps().get(appID);
765       if (rmApp != null) {
766         try {
767           rmApp.handle(event);
768         } catch (Throwable t) {
769           LOG.error("Error in handling event type " + event.getType()
770               + " for application " + appID, t);
771         }
772       }
773     }
774   }
775 
776   @Private
777   public static final class ApplicationAttemptEventDispatcher implements
778       EventHandler<RMAppAttemptEvent> {
779 
780     private final RMContext rmContext;
781 
ApplicationAttemptEventDispatcher(RMContext rmContext)782     public ApplicationAttemptEventDispatcher(RMContext rmContext) {
783       this.rmContext = rmContext;
784     }
785 
786     @Override
handle(RMAppAttemptEvent event)787     public void handle(RMAppAttemptEvent event) {
788       ApplicationAttemptId appAttemptID = event.getApplicationAttemptId();
789       ApplicationId appAttemptId = appAttemptID.getApplicationId();
790       RMApp rmApp = this.rmContext.getRMApps().get(appAttemptId);
791       if (rmApp != null) {
792         RMAppAttempt rmAppAttempt = rmApp.getRMAppAttempt(appAttemptID);
793         if (rmAppAttempt != null) {
794           try {
795             rmAppAttempt.handle(event);
796           } catch (Throwable t) {
797             LOG.error("Error in handling event type " + event.getType()
798                 + " for applicationAttempt " + appAttemptId, t);
799           }
800         }
801       }
802     }
803   }
804 
805   @Private
806   public static final class NodeEventDispatcher implements
807       EventHandler<RMNodeEvent> {
808 
809     private final RMContext rmContext;
810 
NodeEventDispatcher(RMContext rmContext)811     public NodeEventDispatcher(RMContext rmContext) {
812       this.rmContext = rmContext;
813     }
814 
815     @Override
handle(RMNodeEvent event)816     public void handle(RMNodeEvent event) {
817       NodeId nodeId = event.getNodeId();
818       RMNode node = this.rmContext.getRMNodes().get(nodeId);
819       if (node != null) {
820         try {
821           ((EventHandler<RMNodeEvent>) node).handle(event);
822         } catch (Throwable t) {
823           LOG.error("Error in handling event type " + event.getType()
824               + " for node " + nodeId, t);
825         }
826       }
827     }
828   }
829 
startWepApp()830   protected void startWepApp() {
831 
832     // Use the customized yarn filter instead of the standard kerberos filter to
833     // allow users to authenticate using delegation tokens
834     // 4 conditions need to be satisfied -
835     // 1. security is enabled
836     // 2. http auth type is set to kerberos
837     // 3. "yarn.resourcemanager.webapp.use-yarn-filter" override is set to true
838     // 4. hadoop.http.filter.initializers container AuthenticationFilterInitializer
839 
840     Configuration conf = getConfig();
841     boolean enableCorsFilter =
842         conf.getBoolean(YarnConfiguration.RM_WEBAPP_ENABLE_CORS_FILTER,
843             YarnConfiguration.DEFAULT_RM_WEBAPP_ENABLE_CORS_FILTER);
844     boolean useYarnAuthenticationFilter =
845         conf.getBoolean(
846           YarnConfiguration.RM_WEBAPP_DELEGATION_TOKEN_AUTH_FILTER,
847           YarnConfiguration.DEFAULT_RM_WEBAPP_DELEGATION_TOKEN_AUTH_FILTER);
848     String authPrefix = "hadoop.http.authentication.";
849     String authTypeKey = authPrefix + "type";
850     String filterInitializerConfKey = "hadoop.http.filter.initializers";
851     String actualInitializers = "";
852     Class<?>[] initializersClasses =
853         conf.getClasses(filterInitializerConfKey);
854 
855     // setup CORS
856     if (enableCorsFilter) {
857       conf.setBoolean(HttpCrossOriginFilterInitializer.PREFIX
858           + HttpCrossOriginFilterInitializer.ENABLED_SUFFIX, true);
859     }
860 
861     boolean hasHadoopAuthFilterInitializer = false;
862     boolean hasRMAuthFilterInitializer = false;
863     if (initializersClasses != null) {
864       for (Class<?> initializer : initializersClasses) {
865         if (initializer.getName().equals(
866           AuthenticationFilterInitializer.class.getName())) {
867           hasHadoopAuthFilterInitializer = true;
868         }
869         if (initializer.getName().equals(
870           RMAuthenticationFilterInitializer.class.getName())) {
871           hasRMAuthFilterInitializer = true;
872         }
873       }
874       if (UserGroupInformation.isSecurityEnabled()
875           && useYarnAuthenticationFilter
876           && hasHadoopAuthFilterInitializer
877           && conf.get(authTypeKey, "").equals(
878             KerberosAuthenticationHandler.TYPE)) {
879         ArrayList<String> target = new ArrayList<String>();
880         for (Class<?> filterInitializer : initializersClasses) {
881           if (filterInitializer.getName().equals(
882             AuthenticationFilterInitializer.class.getName())) {
883             if (hasRMAuthFilterInitializer == false) {
884               target.add(RMAuthenticationFilterInitializer.class.getName());
885             }
886             continue;
887           }
888           target.add(filterInitializer.getName());
889         }
890         actualInitializers = StringUtils.join(",", target);
891 
892         LOG.info("Using RM authentication filter(kerberos/delegation-token)"
893             + " for RM webapp authentication");
894         RMAuthenticationFilter
895           .setDelegationTokenSecretManager(getClientRMService().rmDTSecretManager);
896         conf.set(filterInitializerConfKey, actualInitializers);
897       }
898     }
899 
900     // if security is not enabled and the default filter initializer has not
901     // been set, set the initializer to include the
902     // RMAuthenticationFilterInitializer which in turn will set up the simple
903     // auth filter.
904 
905     String initializers = conf.get(filterInitializerConfKey);
906     if (!UserGroupInformation.isSecurityEnabled()) {
907       if (initializersClasses == null || initializersClasses.length == 0) {
908         conf.set(filterInitializerConfKey,
909           RMAuthenticationFilterInitializer.class.getName());
910         conf.set(authTypeKey, "simple");
911       } else if (initializers.equals(StaticUserWebFilter.class.getName())) {
912         conf.set(filterInitializerConfKey,
913           RMAuthenticationFilterInitializer.class.getName() + ","
914               + initializers);
915         conf.set(authTypeKey, "simple");
916       }
917     }
918 
919     Builder<ApplicationMasterService> builder =
920         WebApps
921             .$for("cluster", ApplicationMasterService.class, masterService,
922                 "ws")
923             .with(conf)
924             .withHttpSpnegoPrincipalKey(
925                 YarnConfiguration.RM_WEBAPP_SPNEGO_USER_NAME_KEY)
926             .withHttpSpnegoKeytabKey(
927                 YarnConfiguration.RM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY)
928             .at(webAppAddress);
929     String proxyHostAndPort = WebAppUtils.getProxyHostAndPort(conf);
930     if(WebAppUtils.getResolvedRMWebAppURLWithoutScheme(conf).
931         equals(proxyHostAndPort)) {
932       if (HAUtil.isHAEnabled(conf)) {
933         fetcher = new AppReportFetcher(conf);
934       } else {
935         fetcher = new AppReportFetcher(conf, getClientRMService());
936       }
937       builder.withServlet(ProxyUriUtils.PROXY_SERVLET_NAME,
938           ProxyUriUtils.PROXY_PATH_SPEC, WebAppProxyServlet.class);
939       builder.withAttribute(WebAppProxy.FETCHER_ATTRIBUTE, fetcher);
940       String[] proxyParts = proxyHostAndPort.split(":");
941       builder.withAttribute(WebAppProxy.PROXY_HOST_ATTRIBUTE, proxyParts[0]);
942 
943     }
944     webApp = builder.start(new RMWebApp(this));
945   }
946 
947   /**
948    * Helper method to create and init {@link #activeServices}. This creates an
949    * instance of {@link RMActiveServices} and initializes it.
950    * @throws Exception
951    */
createAndInitActiveServices()952   protected void createAndInitActiveServices() throws Exception {
953     activeServices = new RMActiveServices(this);
954     activeServices.init(conf);
955   }
956 
957   /**
958    * Helper method to start {@link #activeServices}.
959    * @throws Exception
960    */
startActiveServices()961   void startActiveServices() throws Exception {
962     if (activeServices != null) {
963       clusterTimeStamp = System.currentTimeMillis();
964       activeServices.start();
965     }
966   }
967 
968   /**
969    * Helper method to stop {@link #activeServices}.
970    * @throws Exception
971    */
stopActiveServices()972   void stopActiveServices() throws Exception {
973     if (activeServices != null) {
974       activeServices.stop();
975       activeServices = null;
976     }
977   }
978 
reinitialize(boolean initialize)979   void reinitialize(boolean initialize) throws Exception {
980     ClusterMetrics.destroy();
981     QueueMetrics.clearQueueMetrics();
982     if (initialize) {
983       resetDispatcher();
984       createAndInitActiveServices();
985     }
986   }
987 
988   @VisibleForTesting
areActiveServicesRunning()989   protected boolean areActiveServicesRunning() {
990     return activeServices != null && activeServices.isInState(STATE.STARTED);
991   }
992 
transitionToActive()993   synchronized void transitionToActive() throws Exception {
994     if (rmContext.getHAServiceState() == HAServiceProtocol.HAServiceState.ACTIVE) {
995       LOG.info("Already in active state");
996       return;
997     }
998 
999     LOG.info("Transitioning to active state");
1000 
1001     this.rmLoginUGI.doAs(new PrivilegedExceptionAction<Void>() {
1002       @Override
1003       public Void run() throws Exception {
1004         try {
1005           startActiveServices();
1006           return null;
1007         } catch (Exception e) {
1008           reinitialize(true);
1009           throw e;
1010         }
1011       }
1012     });
1013 
1014     rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.ACTIVE);
1015     LOG.info("Transitioned to active state");
1016   }
1017 
transitionToStandby(boolean initialize)1018   synchronized void transitionToStandby(boolean initialize)
1019       throws Exception {
1020     if (rmContext.getHAServiceState() ==
1021         HAServiceProtocol.HAServiceState.STANDBY) {
1022       LOG.info("Already in standby state");
1023       return;
1024     }
1025 
1026     LOG.info("Transitioning to standby state");
1027     HAServiceState state = rmContext.getHAServiceState();
1028     rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.STANDBY);
1029     if (state == HAServiceProtocol.HAServiceState.ACTIVE) {
1030       stopActiveServices();
1031       reinitialize(initialize);
1032     }
1033     LOG.info("Transitioned to standby state");
1034   }
1035 
1036   @Override
serviceStart()1037   protected void serviceStart() throws Exception {
1038     if (this.rmContext.isHAEnabled()) {
1039       transitionToStandby(true);
1040     } else {
1041       transitionToActive();
1042     }
1043 
1044     startWepApp();
1045     if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER,
1046         false)) {
1047       int port = webApp.port();
1048       WebAppUtils.setRMWebAppPort(conf, port);
1049     }
1050     super.serviceStart();
1051   }
1052 
doSecureLogin()1053   protected void doSecureLogin() throws IOException {
1054 	InetSocketAddress socAddr = getBindAddress(conf);
1055     SecurityUtil.login(this.conf, YarnConfiguration.RM_KEYTAB,
1056         YarnConfiguration.RM_PRINCIPAL, socAddr.getHostName());
1057 
1058     // if security is enable, set rmLoginUGI as UGI of loginUser
1059     if (UserGroupInformation.isSecurityEnabled()) {
1060       this.rmLoginUGI = UserGroupInformation.getLoginUser();
1061     }
1062   }
1063 
1064   @Override
serviceStop()1065   protected void serviceStop() throws Exception {
1066     if (webApp != null) {
1067       webApp.stop();
1068     }
1069     if (fetcher != null) {
1070       fetcher.stop();
1071     }
1072     if (configurationProvider != null) {
1073       configurationProvider.close();
1074     }
1075     super.serviceStop();
1076     transitionToStandby(false);
1077     rmContext.setHAServiceState(HAServiceState.STOPPING);
1078   }
1079 
createResourceTrackerService()1080   protected ResourceTrackerService createResourceTrackerService() {
1081     return new ResourceTrackerService(this.rmContext, this.nodesListManager,
1082         this.nmLivelinessMonitor,
1083         this.rmContext.getContainerTokenSecretManager(),
1084         this.rmContext.getNMTokenSecretManager());
1085   }
1086 
createClientRMService()1087   protected ClientRMService createClientRMService() {
1088     return new ClientRMService(this.rmContext, scheduler, this.rmAppManager,
1089         this.applicationACLsManager, this.queueACLsManager,
1090         this.rmContext.getRMDelegationTokenSecretManager());
1091   }
1092 
createApplicationMasterService()1093   protected ApplicationMasterService createApplicationMasterService() {
1094     return new ApplicationMasterService(this.rmContext, scheduler);
1095   }
1096 
createAdminService()1097   protected AdminService createAdminService() {
1098     return new AdminService(this, rmContext);
1099   }
1100 
createRMSecretManagerService()1101   protected RMSecretManagerService createRMSecretManagerService() {
1102     return new RMSecretManagerService(conf, rmContext);
1103   }
1104 
1105   @Private
getClientRMService()1106   public ClientRMService getClientRMService() {
1107     return this.clientRM;
1108   }
1109 
1110   /**
1111    * return the scheduler.
1112    * @return the scheduler for the Resource Manager.
1113    */
1114   @Private
getResourceScheduler()1115   public ResourceScheduler getResourceScheduler() {
1116     return this.scheduler;
1117   }
1118 
1119   /**
1120    * return the resource tracking component.
1121    * @return the resource tracking component.
1122    */
1123   @Private
getResourceTrackerService()1124   public ResourceTrackerService getResourceTrackerService() {
1125     return this.resourceTracker;
1126   }
1127 
1128   @Private
getApplicationMasterService()1129   public ApplicationMasterService getApplicationMasterService() {
1130     return this.masterService;
1131   }
1132 
1133   @Private
getApplicationACLsManager()1134   public ApplicationACLsManager getApplicationACLsManager() {
1135     return this.applicationACLsManager;
1136   }
1137 
1138   @Private
getQueueACLsManager()1139   public QueueACLsManager getQueueACLsManager() {
1140     return this.queueACLsManager;
1141   }
1142 
1143   @Private
getWebapp()1144   WebApp getWebapp() {
1145     return this.webApp;
1146   }
1147 
1148   @Override
recover(RMState state)1149   public void recover(RMState state) throws Exception {
1150     // recover RMdelegationTokenSecretManager
1151     rmContext.getRMDelegationTokenSecretManager().recover(state);
1152 
1153     // recover AMRMTokenSecretManager
1154     rmContext.getAMRMTokenSecretManager().recover(state);
1155 
1156     // recover applications
1157     rmAppManager.recover(state);
1158 
1159     setSchedulerRecoveryStartAndWaitTime(state, conf);
1160   }
1161 
main(String argv[])1162   public static void main(String argv[]) {
1163     Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
1164     StringUtils.startupShutdownMessage(ResourceManager.class, argv, LOG);
1165     try {
1166       Configuration conf = new YarnConfiguration();
1167       GenericOptionsParser hParser = new GenericOptionsParser(conf, argv);
1168       argv = hParser.getRemainingArgs();
1169       // If -format-state-store, then delete RMStateStore; else startup normally
1170       if (argv.length == 1 && argv[0].equals("-format-state-store")) {
1171         deleteRMStateStore(conf);
1172       } else {
1173         ResourceManager resourceManager = new ResourceManager();
1174         ShutdownHookManager.get().addShutdownHook(
1175           new CompositeServiceShutdownHook(resourceManager),
1176           SHUTDOWN_HOOK_PRIORITY);
1177         resourceManager.init(conf);
1178         resourceManager.start();
1179       }
1180     } catch (Throwable t) {
1181       LOG.fatal("Error starting ResourceManager", t);
1182       System.exit(-1);
1183     }
1184   }
1185 
1186   /**
1187    * Register the handlers for alwaysOn services
1188    */
setupDispatcher()1189   private Dispatcher setupDispatcher() {
1190     Dispatcher dispatcher = createDispatcher();
1191     dispatcher.register(RMFatalEventType.class,
1192         new ResourceManager.RMFatalEventDispatcher());
1193     return dispatcher;
1194   }
1195 
resetDispatcher()1196   private void resetDispatcher() {
1197     Dispatcher dispatcher = setupDispatcher();
1198     ((Service)dispatcher).init(this.conf);
1199     ((Service)dispatcher).start();
1200     removeService((Service)rmDispatcher);
1201     // Need to stop previous rmDispatcher before assigning new dispatcher
1202     // otherwise causes "AsyncDispatcher event handler" thread leak
1203     ((Service) rmDispatcher).stop();
1204     rmDispatcher = dispatcher;
1205     addIfService(rmDispatcher);
1206     rmContext.setDispatcher(rmDispatcher);
1207   }
1208 
setSchedulerRecoveryStartAndWaitTime(RMState state, Configuration conf)1209   private void setSchedulerRecoveryStartAndWaitTime(RMState state,
1210       Configuration conf) {
1211     if (!state.getApplicationState().isEmpty()) {
1212       long waitTime =
1213           conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
1214             YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS);
1215       rmContext.setSchedulerRecoveryStartAndWaitTime(waitTime);
1216     }
1217   }
1218 
1219   /**
1220    * Retrieve RM bind address from configuration
1221    *
1222    * @param conf
1223    * @return InetSocketAddress
1224    */
getBindAddress(Configuration conf)1225   public static InetSocketAddress getBindAddress(Configuration conf) {
1226     return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
1227       YarnConfiguration.DEFAULT_RM_ADDRESS, YarnConfiguration.DEFAULT_RM_PORT);
1228   }
1229 
1230   /**
1231    * Deletes the RMStateStore
1232    *
1233    * @param conf
1234    * @throws Exception
1235    */
deleteRMStateStore(Configuration conf)1236   private static void deleteRMStateStore(Configuration conf) throws Exception {
1237     RMStateStore rmStore = RMStateStoreFactory.getStore(conf);
1238     rmStore.init(conf);
1239     rmStore.start();
1240     try {
1241       LOG.info("Deleting ResourceManager state store...");
1242       rmStore.deleteStore();
1243       LOG.info("State store deleted");
1244     } finally {
1245       rmStore.stop();
1246     }
1247   }
1248 }
1249